You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2013/05/08 17:17:29 UTC

svn commit: r1480319 - in /jackrabbit/oak/trunk/oak-mongomk/src: main/java/org/apache/jackrabbit/mongomk/ test/java/org/apache/jackrabbit/mongomk/

Author: thomasm
Date: Wed May  8 15:17:22 2013
New Revision: 1480319

URL: http://svn.apache.org/r1480319
Log:
OAK-762 MongoMK: automatic unique cluster id / revision order depends on the cluster id (WIP)

Modified:
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java?rev=1480319&r1=1480318&r2=1480319&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java Wed May  8 15:17:22 2013
@@ -265,7 +265,7 @@ public class Commit {
         Map<String, Object> map = store.createOrUpdate(Collection.NODES, op);
         if (baseRevision != null) {
             final AtomicReference<List<Revision>> collisions = new AtomicReference<List<Revision>>();
-            Revision newestRev = mk.getNewestRevision(map, revision, true,
+            Revision newestRev = mk.getNewestRevision(map, revision,
                     new CollisionHandler() {
                 @Override
                 void uncommittedModification(Revision uncommitted) {
@@ -296,9 +296,6 @@ public class Commit {
                 }
             }
             if (conflict != null) {
-                if (newestRev != null) {
-                    mk.publishRevision(newestRev);
-                }
                 throw conflict;
             }
             // if we get here the modification was successful

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java?rev=1480319&r1=1480318&r2=1480319&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java Wed May  8 15:17:22 2013
@@ -215,7 +215,7 @@ public class MongoMK implements MicroKer
         init();
         // initial reading of the revisions of other cluster nodes
         backgroundRead();
-        revisionComparator.add(headRevision, Revision.getCurrentTimestamp() + 1);
+        revisionComparator.add(headRevision, Revision.newRevision(0));
         headRevision = newRevision();
         LOG.info("Initialized MongoMK with clusterNodeId: {}", clusterId);
     }
@@ -301,7 +301,10 @@ public class MongoMK implements MicroKer
         Map<String, String> lastRevMap = (Map<String, String>) map.get(UpdateOp.LAST_REV);
         
         boolean hasNewRevisions = false;
-        long timestamp = Revision.getCurrentTimestamp();
+        // the (old) head occurred first
+        Revision headSeen = Revision.newRevision(0);
+        // then we saw this new revision (from another cluster node) 
+        Revision otherSeen = Revision.newRevision(0);
         for (Entry<String, String> e : lastRevMap.entrySet()) {
             int machineId = Integer.parseInt(e.getKey());
             if (machineId == clusterId) {
@@ -312,7 +315,7 @@ public class MongoMK implements MicroKer
             if (last == null || r.compareRevisionTime(last) > 0) {
                 lastKnownRevision.put(machineId, r);
                 hasNewRevisions = true;
-                revisionComparator.add(r, timestamp);
+                revisionComparator.add(r, otherSeen);
             }
         }
         if (hasNewRevisions) {
@@ -323,44 +326,13 @@ public class MongoMK implements MicroKer
             Revision r = Revision.newRevision(clusterId);
             // the latest revisions of the current cluster node
             // happened before the latest revisions of other cluster nodes
-            revisionComparator.add(r, timestamp - 1);
+            revisionComparator.add(r, headSeen);
             // the head revision is after other revisions
             headRevision = Revision.newRevision(clusterId);
         }
-        revisionComparator.purge(timestamp - REMEMBER_REVISION_ORDER_MILLIS);
+        revisionComparator.purge(Revision.getCurrentTimestamp() - REMEMBER_REVISION_ORDER_MILLIS);
     }
     
-    /**
-     * Ensure the revision visible from now on, possibly by updating the head
-     * revision, so that the changes that occurred are visible.
-     * 
-     * @param revision the revision
-     */
-    void publishRevision(Revision revision) {  
-        if (revisionComparator.compare(headRevision, revision) >= 0) {
-            // already visible
-            return;
-        }
-        int clusterNodeId = revision.getClusterId();
-        if (clusterNodeId == this.clusterId) {
-            return;
-        }
-        long timestamp = Revision.getCurrentTimestamp();
-        revisionComparator.add(revision, timestamp);
-        // TODO invalidating the whole cache is not really needed,
-        // but how to ensure we invalidate the right part of the cache?
-        // possibly simply wait for the background thread to pick
-        // up the changes, but this depends on how often this method is called
-        store.invalidateCache();
-        // add a new revision, so that changes are visible
-        headRevision = Revision.newRevision(clusterId);
-        // the latest revisions of the current cluster node
-        // happened before the latest revisions of other cluster nodes
-        revisionComparator.add(headRevision, timestamp - 1);
-        // the head revision is after other revisions
-        headRevision = Revision.newRevision(clusterId);
-    }
-
     void backgroundWrite() {
         if (unsavedLastRevisions.size() == 0) {
             return;
@@ -1143,8 +1115,7 @@ public class MongoMK implements MicroKer
      * Get the revision of the latest change made to this node.
      * 
      * @param nodeMap the document
-     * @param readRevision the returned value is guaranteed to _not_ match this revision,
-     *              but it might be in this branch
+     * @param changeRev the revision of the current change
      * @param onlyCommitted whether only committed changes should be considered
      * @param handler the conflict handler, which is called for un-committed revisions
      *                preceding <code>before</code>.
@@ -1152,12 +1123,10 @@ public class MongoMK implements MicroKer
      */
     @SuppressWarnings("unchecked")
     @Nullable Revision getNewestRevision(Map<String, Object> nodeMap,
-                                         Revision except, boolean onlyCommitted,
-                                         CollisionHandler handler) {
+                                         Revision changeRev, CollisionHandler handler) {
         if (nodeMap == null) {
             return null;
         }
-        // TODO remove "except"
         SortedSet<String> revisions = new TreeSet<String>(Collections.reverseOrder());
         if (nodeMap.containsKey(UpdateOp.REVISIONS)) {
             revisions.addAll(((Map<String, String>) nodeMap.get(UpdateOp.REVISIONS)).keySet());
@@ -1173,10 +1142,16 @@ public class MongoMK implements MicroKer
         Revision newestRev = null;
         for (String r : revisions) {
             Revision propRev = Revision.fromString(r);
+            if (isRevisionNewer(propRev, changeRev)) {
+                // we have seen a previous change from another cluster node
+                // (which might be conflicting or not) - we need to make
+                // sure this change is visible from now on
+                publishRevision(propRev, changeRev);
+            }
             if (newestRev == null || isRevisionNewer(propRev, newestRev)) {
-                if (!propRev.equals(except)) {
-                    if (onlyCommitted && !isValidRevision(
-                            propRev, except, nodeMap, new HashSet<Revision>())) {
+                if (!propRev.equals(changeRev)) {
+                    if (!isValidRevision(
+                            propRev, changeRev, nodeMap, new HashSet<Revision>())) {
                         handler.uncommittedModification(propRev);
                     } else {
                         newestRev = propRev;
@@ -1197,6 +1172,42 @@ public class MongoMK implements MicroKer
         return newestRev;
     }
     
+    /**
+     * Ensure the revision visible from now on, possibly by updating the head
+     * revision, so that the changes that occurred are visible.
+     * 
+     * @param foreignRevision the revision from another cluster node
+     * @param changeRevision the local revision that is sorted after the foreign revision
+     */
+    private void publishRevision(Revision foreignRevision, Revision changeRevision) {  
+        if (revisionComparator.compare(headRevision, foreignRevision) >= 0) {
+            // already visible
+            return;
+        }
+        int clusterNodeId = foreignRevision.getClusterId();
+        if (clusterNodeId == this.clusterId) {
+            return;
+        }
+        // the (old) head occurred first
+        Revision headSeen = Revision.newRevision(0);
+        // then we saw this new revision (from another cluster node) 
+        Revision otherSeen = Revision.newRevision(0);
+        // and after that, the current change
+        Revision changeSeen = Revision.newRevision(0);
+        revisionComparator.add(foreignRevision, otherSeen);
+        // TODO invalidating the whole cache is not really needed,
+        // but how to ensure we invalidate the right part of the cache?
+        // possibly simply wait for the background thread to pick
+        // up the changes, but this depends on how often this method is called
+        store.invalidateCache();
+        // the latest revisions of the current cluster node
+        // happened before the latest revisions of other cluster nodes
+        revisionComparator.add(headRevision, headSeen);
+        revisionComparator.add(changeRevision, changeSeen);
+        // the head revision is after other revisions
+        headRevision = Revision.newRevision(clusterId);
+    }
+    
     private static String stripBranchRevMarker(String revisionId) {
         if (revisionId.startsWith("b")) {
             return revisionId.substring(1);

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java?rev=1480319&r1=1480318&r2=1480319&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java Wed May  8 15:17:22 2013
@@ -195,13 +195,13 @@ public class Revision {
         Revision revision;
 
         /**
-         * The (local) timestamp; the time when this revision was seen by this
+         * The (local) revision; the time when this revision was seen by this
          * cluster instance.
          */
-        long timestamp;
+        Revision seenAt;
         
         public String toString() {
-            return revision + ":" + timestamp;
+            return revision + ":" + seenAt;
         }
         
     }
@@ -211,6 +211,10 @@ public class Revision {
      * It contains a map of revision ranges.
      */
     public static class RevisionComparator implements Comparator<Revision> {
+
+        private static final Revision NEWEST = new Revision(Long.MAX_VALUE, 0, 0);
+
+        private static final Revision FUTURE = new Revision(Long.MAX_VALUE, Integer.MAX_VALUE, 0);
         
         /**
          * The map of cluster instances to lists of revision ranges.
@@ -270,7 +274,7 @@ public class Revision {
             int i = 0;
             for (; i < list.size(); i++) {
                 RevisionRange r = list.get(i);
-                if (r.timestamp > oldestTimestamp) {
+                if (r.seenAt.getTimestamp() > oldestTimestamp) {
                     break;
                 }
             }
@@ -287,9 +291,9 @@ public class Revision {
          * If an entry for this timestamp already exists, it is replaced.
          * 
          * @param r the revision
-         * @param timestamp the timestamp
+         * @param seenAt the (local) revision where this revision was seen here
          */
-        public void add(Revision r, long timestamp) {
+        public void add(Revision r, Revision seenAt) {
             int clusterId = r.getClusterId();
             while (true) {
                 List<RevisionRange> list = map.get(clusterId);
@@ -298,7 +302,7 @@ public class Revision {
                     newList = new ArrayList<RevisionRange>();
                 } else {
                     RevisionRange last = list.get(list.size() - 1);
-                    if (last.timestamp == timestamp) {
+                    if (last.seenAt.equals(seenAt)) {
                         // replace existing
                         if (r.compareRevisionTime(last.revision) > 0) {
                             // but only if newer
@@ -306,10 +310,13 @@ public class Revision {
                         }
                         return;
                     }
+                    if (last.revision.compareRevisionTime(r) > 0) {
+                        throw new IllegalArgumentException("Can not add an earlier revision");
+                    }
                     newList = new ArrayList<RevisionRange>(list);
                 }
                 RevisionRange range = new RevisionRange();
-                range.timestamp = timestamp;
+                range.seenAt = seenAt;
                 range.revision = r;
                 newList.add(range);
                 if (list == null) {
@@ -329,31 +336,16 @@ public class Revision {
             if (o1.getClusterId() == o2.getClusterId()) {
                 return o1.compareRevisionTime(o2);
             }
-            long range1 = getRevisionRangeTimestamp(o1);
-            long range2 = getRevisionRangeTimestamp(o2);
-            if (range1 == 0 || range2 == 0) {
+            Revision range1 = getRevisionSeen(o1);
+            Revision range2 = getRevisionSeen(o2);
+            if (range1 == null || range2 == null) {
                 return o1.compareRevisionTime(o2);
             }
-            if (range1 != range2) {
-                return range1 < range2 ? -1 : 1;
+            int comp = range1.compareRevisionTime(range2);
+            if (comp != 0) {
+                return comp;
             }
-            if (range1 == Long.MAX_VALUE) {
-                // in this case, both must be Long.MAX_VALUE, otherwise
-                // the previous check would have been true; and additionally
-                // the revisions are from different cluster nodes
-                if (o1.getClusterId() == currentClusterNodeId) {
-                    return 1;
-                } else if (o2.getClusterId() == currentClusterNodeId) {
-                    return -1;
-                }
-                // both revisions are new revisions of other cluster nodes
-                // (in reality this doesn't actually happen I believe)
-            }
-            int result = o1.compareRevisionTime(o2);
-            if (result != 0) {
-                return result;
-            }
-            return o1.getClusterId() < o2.getClusterId() ? -1 : 1;
+            return Integer.signum(o1.getClusterId() - o2.getClusterId());
         }
         
         /**
@@ -364,19 +356,19 @@ public class Revision {
          * returned.
          * 
          * @param r the revision
-         * @return the timestamp, 0 if not found, 
+         * @return the revision where it was seen, null if not found, 
          *      the timestamp plus 1 second for new local revisions;
          *      Long.MAX_VALUE for new non-local revisions (meaning 'in the future')
          */
-        private long getRevisionRangeTimestamp(Revision r) {
+        private Revision getRevisionSeen(Revision r) {
             List<RevisionRange> list = map.get(r.getClusterId());
             if (list == null) {
-                return 0;
+                return null;
             }
             // search from latest backward
             // (binary search could be used, but we expect most queries
             // at the end of the list)
-            long result = 0;
+            Revision result = null;
             for (int i = list.size() - 1; i >= 0; i--) {
                 RevisionRange range = list.get(i);
                 int compare = r.compareRevisionTime(range.revision);
@@ -384,15 +376,15 @@ public class Revision {
                     if (i == list.size() - 1) {
                         // newer than the newest range
                         if (r.getClusterId() == currentClusterNodeId) {
-                            // newer than all 
-                            return range.timestamp + 1000;
+                            // newer than all others, except for FUTURE
+                            return NEWEST;
                         }
                         // happenes in the future (not visible yet)
-                        return Long.MAX_VALUE;
+                        return FUTURE;
                     }
                     break;
                 }
-                result = range.timestamp;
+                result = range.seenAt;
             }
             return result;
         }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java?rev=1480319&r1=1480318&r2=1480319&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java Wed May  8 15:17:22 2013
@@ -23,7 +23,6 @@ import org.apache.jackrabbit.mk.api.Micr
 import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.mongodb.DB;
@@ -54,7 +53,6 @@ public class ClusterTest {
     }
     
     @Test
-    @Ignore
     public void openCloseOpen() {
         MemoryDocumentStore ds = new MemoryDocumentStore();
         MemoryBlobStore bs = new MemoryBlobStore();
@@ -71,9 +69,29 @@ public class ClusterTest {
         MongoMK mk2 = builder.setClusterId(2).open();
         mk2.commit("/", "+\"a\": {}", null, null);
         mk2.commit("/", "-\"a\"", null, null);
-        
+
+        builder = new MongoMK.Builder();
+        builder.setDocumentStore(ds).setBlobStore(bs);
+        MongoMK mk3 = builder.setClusterId(3).open();
+        mk3.commit("/", "+\"a\": {}", null, null);
+        mk3.commit("/", "-\"a\"", null, null);
+
+        builder = new MongoMK.Builder();
+        builder.setDocumentStore(ds).setBlobStore(bs);
+        MongoMK mk4 = builder.setClusterId(4).open();
+        mk4.commit("/", "+\"a\": {}", null, null);
+
+        builder = new MongoMK.Builder();
+        builder.setDocumentStore(ds).setBlobStore(bs);
+        MongoMK mk5 = builder.setClusterId(5).open();
+        mk5.commit("/", "-\"a\"", null, null);
+        mk5.commit("/", "+\"a\": {}", null, null);
+
         mk1.dispose();
         mk2.dispose();
+        mk3.dispose();
+        mk4.dispose();
+        mk5.dispose();
     }    
     
     @Test

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java?rev=1480319&r1=1480318&r2=1480319&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java Wed May  8 15:17:22 2013
@@ -134,27 +134,27 @@ public class RevisionTest {
         assertEquals(-1, comp.compare(r3c1, r3c2));
 
         // now we declare r2+r3 of c1 to be after r2+r3 of c2
-        comp.add(r2c1, 20);
-        comp.add(r2c2, 10);
+        comp.add(r2c1, new Revision(0x20, 0, 0));
+        comp.add(r2c2, new Revision(0x10, 0, 0));
 
         assertEquals(
-                "1:\n r120-0-1:20\n" + 
-                "2:\n r200-0-2:10\n", comp.toString());
+                "1:\n r120-0-1:r20-0-0\n" + 
+                "2:\n r200-0-2:r10-0-0\n", comp.toString());
 
         assertEquals(1, comp.compare(r1c1, r1c2));
         assertEquals(1, comp.compare(r2c1, r2c2));
-        // r3c2 is still "in the future"
+        // both r3cx are still "in the future"
         assertEquals(-1, comp.compare(r3c1, r3c2));
         
         // now we declare r3 of c1 to be before r3 of c2
         // (with the same range timestamp, 
         // the revision timestamps are compared)
-        comp.add(r3c1, 30);
-        comp.add(r3c2, 30);
+        comp.add(r3c1, new Revision(0x30, 0, 0));
+        comp.add(r3c2, new Revision(0x30, 0, 0));
 
         assertEquals(
-                "1:\n r120-0-1:20 r130-0-1:30\n" + 
-                "2:\n r200-0-2:10 r300-0-2:30\n", comp.toString());
+                "1:\n r120-0-1:r20-0-0 r130-0-1:r30-0-0\n" + 
+                "2:\n r200-0-2:r10-0-0 r300-0-2:r30-0-0\n", comp.toString());
 
         assertEquals(1, comp.compare(r1c1, r1c2));
         assertEquals(1, comp.compare(r2c1, r2c2));
@@ -165,22 +165,22 @@ public class RevisionTest {
         assertEquals(1, comp.compare(r3c2, r3c1));
         
         // get rid of old timestamps
-        comp.purge(10);
+        comp.purge(0x10);
         assertEquals(
-                "1:\n r120-0-1:20 r130-0-1:30\n" + 
-                "2:\n r300-0-2:30\n", comp.toString());
-        comp.purge(20);
+                "1:\n r120-0-1:r20-0-0 r130-0-1:r30-0-0\n" + 
+                "2:\n r300-0-2:r30-0-0\n", comp.toString());
+        comp.purge(0x20);
         assertEquals(
-                "1:\n r130-0-1:30\n" + 
-                "2:\n r300-0-2:30\n", comp.toString());
+                "1:\n r130-0-1:r30-0-0\n" + 
+                "2:\n r300-0-2:r30-0-0\n", comp.toString());
         
         // update an entry
-        comp.add(new Revision(0x301, 1, 2), 30);
+        comp.add(new Revision(0x301, 1, 2), new Revision(0x30, 0, 0));
         assertEquals(
-                "1:\n r130-0-1:30\n" + 
-                "2:\n r301-1-2:30\n", comp.toString());
+                "1:\n r130-0-1:r30-0-0\n" + 
+                "2:\n r301-1-2:r30-0-0\n", comp.toString());
         
-        comp.purge(30);
+        comp.purge(0x30);
         assertEquals("", comp.toString());
 
     }