You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2016/01/28 19:25:10 UTC

svn commit: r1727415 - in /jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document: BulkCreateOrUpdateClusterTest.java BulkCreateOrUpdateTest.java

Author: reschke
Date: Thu Jan 28 18:25:10 2016
New Revision: 1727415

URL: http://svn.apache.org/viewvc?rev=1727415&view=rev
Log:
OAK-3924: fix database-level row deadlock during bulk updates in RDB

add (ignored) tests

Added:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java?rev=1727415&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java Thu Jan 28 18:25:10 2016
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static java.util.Collections.shuffle;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
+import org.junit.Test;
+
+public class BulkCreateOrUpdateClusterTest extends AbstractMultiDocumentStoreTest {
+
+    public BulkCreateOrUpdateClusterTest(DocumentStoreFixture dsf) {
+        super(dsf);
+    }
+
+    /**
+     * Run multiple batch updates concurrently. Each thread modifies only its own documents.
+     */
+    @Test
+    public void testConcurrentNoConflict() throws InterruptedException {
+        int amountPerThread = 100;
+        int threadCount = 10;
+        int amount = amountPerThread * threadCount;
+
+        List<UpdateOp> updates = new ArrayList<UpdateOp>(amount);
+        // create even items
+        for (int i = 0; i < amount; i += 2) {
+            String id = this.getClass().getName() + ".testConcurrentNoConflict" + i;
+            UpdateOp up = new UpdateOp(id, true);
+            up.set("_id", id);
+            up.set("prop", 100);
+            updates.add(up);
+        }
+        ds1.create(Collection.NODES, updates);
+
+        final Set<Exception> exceptions = new HashSet<Exception>();
+        List<Thread> threads = new ArrayList<Thread>();
+        final Map<String, NodeDocument> oldDocs = new ConcurrentHashMap<String, NodeDocument>();
+        for (int i = 0; i < threadCount; i++) {
+            final DocumentStore selectedDs = i % 2 == 0 ? this.ds1 : this.ds2;
+            final List<UpdateOp> threadUpdates = new ArrayList<UpdateOp>(amountPerThread);
+            for (int j = 0; j < amountPerThread; j++) {
+                String id = this.getClass().getName() + ".testConcurrentNoConflict" + (j + i * amountPerThread);
+                UpdateOp up = new UpdateOp(id, true);
+                up.set("_id", id);
+                up.set("prop", 200 + i + j);
+                threadUpdates.add(up);
+                removeMe.add(id);
+            }
+            shuffle(threadUpdates);
+            threads.add(new Thread() {
+                public void run() {
+                    try {
+                        for (NodeDocument d : selectedDs.createOrUpdate(Collection.NODES, threadUpdates)) {
+                            if (d == null) {
+                                continue;
+                            }
+                            oldDocs.put(d.getId(), d);
+                        }
+                    }
+                    catch (Exception ex) {
+                        exceptions.add(ex);
+                    }
+                }
+            });
+        }
+
+        for (Thread t : threads) {
+            t.start();
+        }
+        for (Thread t : threads) {
+            t.join();
+            if (t.isAlive()) {
+                fail("Thread hasn't finished in 10s");
+            }
+        }
+
+        if (!exceptions.isEmpty()) {
+            String msg = exceptions.size() + " out of " + threadCount +  " failed with exceptions, the first being: " + exceptions.iterator().next();
+            fail(msg);
+        }
+
+        for (int i = 0; i < amount; i++) {
+            String id = this.getClass().getName() + ".testConcurrentNoConflict" + i;
+
+            NodeDocument oldDoc = oldDocs.get(id);
+            NodeDocument newDoc; //avoid cache issues
+            if (i % 2 == 1) {
+                assertNull("The returned value should be null for created doc", oldDoc);
+                newDoc = ds1.find(Collection.NODES, id);
+            } else {
+                assertNotNull("The returned doc shouldn't be null for updated doc", oldDoc);
+                assertEquals("The old value is not correct", 100l, oldDoc.get("prop"));
+                newDoc = ds2.find(Collection.NODES, id);
+            }
+            assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop"));
+        }
+    }
+
+    /**
+     * Run multiple batch updates concurrently. Each thread modifies the same set of documents.
+     */
+    @Test
+    public void testConcurrentWithConflict() throws InterruptedException {
+        // see OAK-3924
+        assumeTrue(!(ds instanceof RDBDocumentStore));
+        int threadCount = 10;
+        int amount = 500;
+
+        List<UpdateOp> updates = new ArrayList<UpdateOp>(amount);
+        // create even items
+        for (int i = 0; i < amount; i += 2) {
+            String id = this.getClass().getName() + ".testConcurrentNoConflict" + i;
+            UpdateOp up = new UpdateOp(id, true);
+            up.set("_id", id);
+            up.set("prop", 100);
+            updates.add(up);
+            removeMe.add(id);
+        }
+        ds1.create(Collection.NODES, updates);
+
+        final Set<Exception> exceptions = new HashSet<Exception>();
+        List<Thread> threads = new ArrayList<Thread>();
+        for (int i = 0; i < threadCount; i++) {
+            final DocumentStore selectedDs = i % 2 == 0 ? this.ds1 : this.ds2;
+            final List<UpdateOp> threadUpdates = new ArrayList<UpdateOp>(amount);
+            for (int j = 0; j < amount; j++) {
+                String id = this.getClass().getName() + ".testConcurrentWithConflict" + j;
+                UpdateOp up = new UpdateOp(id, true);
+                up.set("_id", id);
+                up.set("prop", 200 + i * amount + j);
+                threadUpdates.add(up);
+                removeMe.add(id);
+            }
+            shuffle(threadUpdates);
+            threads.add(new Thread() {
+                public void run() {
+                    try {
+                        selectedDs.createOrUpdate(Collection.NODES, threadUpdates);
+                    }
+                    catch (Exception ex) {
+                        exceptions.add(ex);
+                    }
+                }
+            });
+        }
+
+        for (Thread t : threads) {
+            t.start();
+        }
+        for (Thread t : threads) {
+            t.join(10000);
+            if (t.isAlive()) {
+                fail("Thread hasn't finished in 10s");
+            }
+        }
+
+        if (!exceptions.isEmpty()) {
+            String msg = exceptions.size() + " out of " + threadCount +  " failed with exceptions, the first being: " + exceptions.iterator().next();
+            fail(msg);
+        }
+
+        for (int i = 0; i < amount; i++) {
+            String id = this.getClass().getName() + ".testConcurrentWithConflict" + i;
+
+            NodeDocument newDoc = ds1.find(Collection.NODES, id);
+            assertNotNull("The document hasn't been inserted", newDoc);
+            assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop"));
+
+            newDoc = ds2.find(Collection.NODES, id);
+            assertNotNull("The document hasn't been inserted", newDoc);
+            assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop"));
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java?rev=1727415&r1=1727414&r2=1727415&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java Thu Jan 28 18:25:10 2016
@@ -16,17 +16,27 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import static java.util.Collections.shuffle;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.sql.DataSource;
 
 import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDataSourceWrapper;
+import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class BulkCreateOrUpdateTest extends AbstractDocumentStoreTest {
@@ -155,6 +165,147 @@ public class BulkCreateOrUpdateTest exte
         }
     }
 
+    /**
+     * Run multiple batch updates concurrently. Each thread modifies only its own documents.
+     */
+    @Test
+    public void testConcurrentNoConflict() throws InterruptedException {
+        int amountPerThread = 100;
+        int threadCount = 10;
+        int amount = amountPerThread * threadCount;
+
+        List<UpdateOp> updates = new ArrayList<UpdateOp>(amount);
+        // create even items
+        for (int i = 0; i < amount; i += 2) {
+            String id = this.getClass().getName() + ".testConcurrentNoConflict" + i;
+            UpdateOp up = new UpdateOp(id, true);
+            up.set("_id", id);
+            up.set("prop", 100);
+            updates.add(up);
+        }
+        ds.create(Collection.NODES, updates);
+
+        List<Thread> threads = new ArrayList<Thread>();
+        final Map<String, NodeDocument> oldDocs = new ConcurrentHashMap<String, NodeDocument>();
+        for (int i = 0; i < threadCount; i++) {
+            final List<UpdateOp> threadUpdates = new ArrayList<UpdateOp>(amountPerThread);
+            for (int j = 0; j < amountPerThread; j++) {
+                String id = this.getClass().getName() + ".testConcurrentNoConflict" + (j + i * amountPerThread);
+                UpdateOp up = new UpdateOp(id, true);
+                up.set("_id", id);
+                up.set("prop", 200 + i + j);
+                threadUpdates.add(up);
+                removeMe.add(id);
+            }
+            shuffle(threadUpdates);
+            threads.add(new Thread() {
+                public void run() {
+                    for (NodeDocument d : ds.createOrUpdate(Collection.NODES, threadUpdates)) {
+                        if (d == null) {
+                            continue;
+                        }
+                        oldDocs.put(d.getId(), d);
+                    }
+                }
+            });
+        }
+
+        for (Thread t : threads) {
+            t.start();
+        }
+        for (Thread t : threads) {
+            t.join();
+            if (t.isAlive()) {
+                fail("Thread hasn't finished in 10s");
+            }
+        }
+
+        for (int i = 0; i < amount; i++) {
+            String id = this.getClass().getName() + ".testConcurrentNoConflict" + i;
+
+            NodeDocument oldDoc = oldDocs.get(id);
+            NodeDocument newDoc = ds.find(Collection.NODES, id);
+            if (i % 2 == 1) {
+                assertNull("The returned value should be null for created doc", oldDoc);
+            } else {
+                assertNotNull("The returned doc shouldn't be null for updated doc", oldDoc);
+                assertEquals("The old value is not correct", 100l, oldDoc.get("prop"));
+            }
+            assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop"));
+        }
+    }
+
+    /**
+     * Run multiple batch updates concurrently. Each thread modifies the same set of documents.
+     */
+    @Test
+    public void testConcurrentWithConflict() throws InterruptedException {
+        // see OAK-3924
+        assumeTrue(!(ds instanceof RDBDocumentStore));
+        int threadCount = 10;
+        int amount = 500;
+
+        List<UpdateOp> updates = new ArrayList<UpdateOp>(amount);
+        // create even items
+        for (int i = 0; i < amount; i += 2) {
+            String id = this.getClass().getName() + ".testConcurrentNoConflict" + i;
+            UpdateOp up = new UpdateOp(id, true);
+            up.set("_id", id);
+            up.set("prop", 100);
+            updates.add(up);
+            removeMe.add(id);
+        }
+        ds.create(Collection.NODES, updates);
+
+        final Set<Exception> exceptions = new HashSet<Exception>();
+        List<Thread> threads = new ArrayList<Thread>();
+        for (int i = 0; i < threadCount; i++) {
+            final List<UpdateOp> threadUpdates = new ArrayList<UpdateOp>(amount);
+            for (int j = 0; j < amount; j++) {
+                String id = this.getClass().getName() + ".testConcurrentWithConflict" + j;
+                UpdateOp up = new UpdateOp(id, true);
+                up.set("_id", id);
+                up.set("prop", 200 + i * amount + j);
+                threadUpdates.add(up);
+                removeMe.add(id);
+            }
+            shuffle(threadUpdates);
+            threads.add(new Thread() {
+                public void run() {
+                    try {
+                        ds.createOrUpdate(Collection.NODES, threadUpdates);
+                    }
+                    catch (Exception ex) {
+                        exceptions.add(ex);
+                    }
+                }
+            });
+        }
+
+        for (Thread t : threads) {
+            t.start();
+        }
+        for (Thread t : threads) {
+            t.join(10000);
+            if (t.isAlive()) {
+                fail("Thread hasn't finished in 10s");
+            }
+        }
+
+        if (!exceptions.isEmpty()) {
+            String msg = exceptions.size() + " out of " + threadCount +  " failed with exceptions, the first being: " + exceptions.iterator().next();
+            fail(msg);
+        }
+
+        for (int i = 0; i < amount; i++) {
+            String id = this.getClass().getName() + ".testConcurrentWithConflict" + i;
+
+            NodeDocument newDoc = ds.find(Collection.NODES, id);
+            assertNotNull("The document hasn't been inserted", newDoc);
+            assertNotEquals("The document hasn't been updated", 100l, newDoc.get("prop"));
+        }
+    }
+
     /**
      * This method adds a few updateOperations modifying the same document.
      */