You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2014/05/22 20:44:24 UTC

svn commit: r1596936 - in /jackrabbit/oak/branches/1.0: ./ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java

Author: mreutegg
Date: Thu May 22 18:44:23 2014
New Revision: 1596936

URL: http://svn.apache.org/r1596936
Log:
OAK-1854: Duplicate revisions

Modified:
    jackrabbit/oak/branches/1.0/   (props changed)
    jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java
    jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java

Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
  Merged /jackrabbit/oak/trunk:r1593245,1593250

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java?rev=1596936&r1=1596935&r2=1596936&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java Thu May 22 18:44:23 2014
@@ -157,6 +157,12 @@ public class Revision {
         long timestamp = getCurrentTimestamp();
         int c;
         synchronized (Revision.class) {
+            // need to check again, because threads
+            // could arrive inside the synchronized block
+            // out of order
+            if (timestamp < lastRevisionTimestamp) {
+                timestamp = lastRevisionTimestamp;
+            }
             if (timestamp == lastRevisionTimestamp) {
                 c = ++lastRevisionCount;
             } else {

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java?rev=1596936&r1=1596935&r2=1596936&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java Thu May 22 18:44:23 2014
@@ -16,12 +16,26 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.jackrabbit.oak.plugins.document.Revision.RevisionComparator;
 import org.junit.Test;
 
@@ -283,4 +297,109 @@ public class RevisionTest {
         assertEquals(new Revision(0x30, 0, 0), comp.getRevisionSeen(r21));
     }
 
+    @Test
+    public void uniqueRevision2() throws Exception {
+        List<Thread> threads = new ArrayList<Thread>();
+        final AtomicBoolean stop = new AtomicBoolean();
+        final Set<Revision> set = Collections
+                .synchronizedSet(new HashSet<Revision>());
+        final Revision[] duplicate = new Revision[1];
+        for (int i = 0; i < 20; i++) {
+            Thread thread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    Revision[] last = new Revision[1024];
+                    while (!stop.get()) {
+                        for (Revision r : last) {
+                            set.remove(r);
+                        }
+                        for (int i = 0; i < last.length; i++) {
+                            last[i] = Revision.newRevision(1);
+                        }
+                        for (Revision r : last) {
+                            if (!set.add(r)) {
+                                duplicate[0] = r;
+                            }
+                        }
+                    }
+                }
+            });
+            thread.start();
+            threads.add(thread);
+        }
+        Thread.sleep(200);
+        stop.set(true);
+        for (Thread t : threads) {
+            t.join();
+        }
+        assertNull("Duplicate revision", duplicate[0]);
+    }
+
+    @Test
+    public void uniqueRevision() throws Exception {
+        //Revision.setClock(new Clock.Virtual());
+        final BlockingQueue<Revision> revisionQueue = Queues.newLinkedBlockingQueue();
+        int noOfThreads = 60;
+        final int noOfLoops = 1000;
+        List<Thread> workers = new ArrayList<Thread>();
+        final AtomicBoolean stop = new AtomicBoolean();
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch stopLatch = new CountDownLatch(noOfThreads);
+        for (int i = 0; i < noOfThreads; i++) {
+            workers.add(new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    Uninterruptibles.awaitUninterruptibly(startLatch);
+                    for (int j = 0; j < noOfLoops && !stop.get(); j++) {
+                        revisionQueue.add(Revision.newRevision(1));
+                    }
+                    stopLatch.countDown();
+                }
+            }));
+        }
+
+        final List<Revision> duplicates = Lists.newArrayList();
+        final Set<Revision> seenRevs = Sets.newHashSet();
+        workers.add(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                startLatch.countDown();
+
+                while (!stop.get()) {
+                    List<Revision> revs = Lists.newArrayList();
+                    Queues.drainUninterruptibly(revisionQueue, revs, 5, 100, TimeUnit.MILLISECONDS);
+                    record(revs);
+                }
+
+                List<Revision> revs = Lists.newArrayList();
+                revisionQueue.drainTo(revs);
+                record(revs);
+            }
+
+            private void record(List<Revision> revs) {
+                for (Revision rev : revs) {
+                    if (!seenRevs.add(rev)) {
+                        duplicates.add(rev);
+                    }
+                }
+
+                if (!duplicates.isEmpty()) {
+                    stop.set(true);
+                }
+            }
+        }));
+
+        for (Thread t : workers) {
+            t.start();
+        }
+
+        stopLatch.await();
+        stop.set(true);
+
+        for (Thread t : workers) {
+            t.join();
+        }
+        assertTrue(String.format("Duplicate rev seen %s %n Seen %s", duplicates, seenRevs), duplicates.isEmpty());
+    }
+
 }