You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2013/05/08 17:17:29 UTC
svn commit: r1480319 - in /jackrabbit/oak/trunk/oak-mongomk/src:
main/java/org/apache/jackrabbit/mongomk/
test/java/org/apache/jackrabbit/mongomk/
Author: thomasm
Date: Wed May 8 15:17:22 2013
New Revision: 1480319
URL: http://svn.apache.org/r1480319
Log:
OAK-762 MongoMK: automatic unique cluster id / revision order depends on the cluster id (WIP)
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java?rev=1480319&r1=1480318&r2=1480319&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java Wed May 8 15:17:22 2013
@@ -265,7 +265,7 @@ public class Commit {
Map<String, Object> map = store.createOrUpdate(Collection.NODES, op);
if (baseRevision != null) {
final AtomicReference<List<Revision>> collisions = new AtomicReference<List<Revision>>();
- Revision newestRev = mk.getNewestRevision(map, revision, true,
+ Revision newestRev = mk.getNewestRevision(map, revision,
new CollisionHandler() {
@Override
void uncommittedModification(Revision uncommitted) {
@@ -296,9 +296,6 @@ public class Commit {
}
}
if (conflict != null) {
- if (newestRev != null) {
- mk.publishRevision(newestRev);
- }
throw conflict;
}
// if we get here the modification was successful
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java?rev=1480319&r1=1480318&r2=1480319&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java Wed May 8 15:17:22 2013
@@ -215,7 +215,7 @@ public class MongoMK implements MicroKer
init();
// initial reading of the revisions of other cluster nodes
backgroundRead();
- revisionComparator.add(headRevision, Revision.getCurrentTimestamp() + 1);
+ revisionComparator.add(headRevision, Revision.newRevision(0));
headRevision = newRevision();
LOG.info("Initialized MongoMK with clusterNodeId: {}", clusterId);
}
@@ -301,7 +301,10 @@ public class MongoMK implements MicroKer
Map<String, String> lastRevMap = (Map<String, String>) map.get(UpdateOp.LAST_REV);
boolean hasNewRevisions = false;
- long timestamp = Revision.getCurrentTimestamp();
+ // the (old) head occurred first
+ Revision headSeen = Revision.newRevision(0);
+ // then we saw this new revision (from another cluster node)
+ Revision otherSeen = Revision.newRevision(0);
for (Entry<String, String> e : lastRevMap.entrySet()) {
int machineId = Integer.parseInt(e.getKey());
if (machineId == clusterId) {
@@ -312,7 +315,7 @@ public class MongoMK implements MicroKer
if (last == null || r.compareRevisionTime(last) > 0) {
lastKnownRevision.put(machineId, r);
hasNewRevisions = true;
- revisionComparator.add(r, timestamp);
+ revisionComparator.add(r, otherSeen);
}
}
if (hasNewRevisions) {
@@ -323,44 +326,13 @@ public class MongoMK implements MicroKer
Revision r = Revision.newRevision(clusterId);
// the latest revisions of the current cluster node
// happened before the latest revisions of other cluster nodes
- revisionComparator.add(r, timestamp - 1);
+ revisionComparator.add(r, headSeen);
// the head revision is after other revisions
headRevision = Revision.newRevision(clusterId);
}
- revisionComparator.purge(timestamp - REMEMBER_REVISION_ORDER_MILLIS);
+ revisionComparator.purge(Revision.getCurrentTimestamp() - REMEMBER_REVISION_ORDER_MILLIS);
}
- /**
- * Ensure the revision visible from now on, possibly by updating the head
- * revision, so that the changes that occurred are visible.
- *
- * @param revision the revision
- */
- void publishRevision(Revision revision) {
- if (revisionComparator.compare(headRevision, revision) >= 0) {
- // already visible
- return;
- }
- int clusterNodeId = revision.getClusterId();
- if (clusterNodeId == this.clusterId) {
- return;
- }
- long timestamp = Revision.getCurrentTimestamp();
- revisionComparator.add(revision, timestamp);
- // TODO invalidating the whole cache is not really needed,
- // but how to ensure we invalidate the right part of the cache?
- // possibly simply wait for the background thread to pick
- // up the changes, but this depends on how often this method is called
- store.invalidateCache();
- // add a new revision, so that changes are visible
- headRevision = Revision.newRevision(clusterId);
- // the latest revisions of the current cluster node
- // happened before the latest revisions of other cluster nodes
- revisionComparator.add(headRevision, timestamp - 1);
- // the head revision is after other revisions
- headRevision = Revision.newRevision(clusterId);
- }
-
void backgroundWrite() {
if (unsavedLastRevisions.size() == 0) {
return;
@@ -1143,8 +1115,7 @@ public class MongoMK implements MicroKer
* Get the revision of the latest change made to this node.
*
* @param nodeMap the document
- * @param readRevision the returned value is guaranteed to _not_ match this revision,
- * but it might be in this branch
+ * @param changeRev the revision of the current change
* @param onlyCommitted whether only committed changes should be considered
* @param handler the conflict handler, which is called for un-committed revisions
* preceding <code>before</code>.
@@ -1152,12 +1123,10 @@ public class MongoMK implements MicroKer
*/
@SuppressWarnings("unchecked")
@Nullable Revision getNewestRevision(Map<String, Object> nodeMap,
- Revision except, boolean onlyCommitted,
- CollisionHandler handler) {
+ Revision changeRev, CollisionHandler handler) {
if (nodeMap == null) {
return null;
}
- // TODO remove "except"
SortedSet<String> revisions = new TreeSet<String>(Collections.reverseOrder());
if (nodeMap.containsKey(UpdateOp.REVISIONS)) {
revisions.addAll(((Map<String, String>) nodeMap.get(UpdateOp.REVISIONS)).keySet());
@@ -1173,10 +1142,16 @@ public class MongoMK implements MicroKer
Revision newestRev = null;
for (String r : revisions) {
Revision propRev = Revision.fromString(r);
+ if (isRevisionNewer(propRev, changeRev)) {
+ // we have seen a previous change from another cluster node
+ // (which might be conflicting or not) - we need to make
+ // sure this change is visible from now on
+ publishRevision(propRev, changeRev);
+ }
if (newestRev == null || isRevisionNewer(propRev, newestRev)) {
- if (!propRev.equals(except)) {
- if (onlyCommitted && !isValidRevision(
- propRev, except, nodeMap, new HashSet<Revision>())) {
+ if (!propRev.equals(changeRev)) {
+ if (!isValidRevision(
+ propRev, changeRev, nodeMap, new HashSet<Revision>())) {
handler.uncommittedModification(propRev);
} else {
newestRev = propRev;
@@ -1197,6 +1172,42 @@ public class MongoMK implements MicroKer
return newestRev;
}
+ /**
+ * Ensure the revision visible from now on, possibly by updating the head
+ * revision, so that the changes that occurred are visible.
+ *
+ * @param foreignRevision the revision from another cluster node
+ * @param changeRevision the local revision that is sorted after the foreign revision
+ */
+ private void publishRevision(Revision foreignRevision, Revision changeRevision) {
+ if (revisionComparator.compare(headRevision, foreignRevision) >= 0) {
+ // already visible
+ return;
+ }
+ int clusterNodeId = foreignRevision.getClusterId();
+ if (clusterNodeId == this.clusterId) {
+ return;
+ }
+ // the (old) head occurred first
+ Revision headSeen = Revision.newRevision(0);
+ // then we saw this new revision (from another cluster node)
+ Revision otherSeen = Revision.newRevision(0);
+ // and after that, the current change
+ Revision changeSeen = Revision.newRevision(0);
+ revisionComparator.add(foreignRevision, otherSeen);
+ // TODO invalidating the whole cache is not really needed,
+ // but how to ensure we invalidate the right part of the cache?
+ // possibly simply wait for the background thread to pick
+ // up the changes, but this depends on how often this method is called
+ store.invalidateCache();
+ // the latest revisions of the current cluster node
+ // happened before the latest revisions of other cluster nodes
+ revisionComparator.add(headRevision, headSeen);
+ revisionComparator.add(changeRevision, changeSeen);
+ // the head revision is after other revisions
+ headRevision = Revision.newRevision(clusterId);
+ }
+
private static String stripBranchRevMarker(String revisionId) {
if (revisionId.startsWith("b")) {
return revisionId.substring(1);
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java?rev=1480319&r1=1480318&r2=1480319&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java Wed May 8 15:17:22 2013
@@ -195,13 +195,13 @@ public class Revision {
Revision revision;
/**
- * The (local) timestamp; the time when this revision was seen by this
+ * The (local) revision; the time when this revision was seen by this
* cluster instance.
*/
- long timestamp;
+ Revision seenAt;
public String toString() {
- return revision + ":" + timestamp;
+ return revision + ":" + seenAt;
}
}
@@ -211,6 +211,10 @@ public class Revision {
* It contains a map of revision ranges.
*/
public static class RevisionComparator implements Comparator<Revision> {
+
+ private static final Revision NEWEST = new Revision(Long.MAX_VALUE, 0, 0);
+
+ private static final Revision FUTURE = new Revision(Long.MAX_VALUE, Integer.MAX_VALUE, 0);
/**
* The map of cluster instances to lists of revision ranges.
@@ -270,7 +274,7 @@ public class Revision {
int i = 0;
for (; i < list.size(); i++) {
RevisionRange r = list.get(i);
- if (r.timestamp > oldestTimestamp) {
+ if (r.seenAt.getTimestamp() > oldestTimestamp) {
break;
}
}
@@ -287,9 +291,9 @@ public class Revision {
* If an entry for this timestamp already exists, it is replaced.
*
* @param r the revision
- * @param timestamp the timestamp
+ * @param seenAt the (local) revision where this revision was seen here
*/
- public void add(Revision r, long timestamp) {
+ public void add(Revision r, Revision seenAt) {
int clusterId = r.getClusterId();
while (true) {
List<RevisionRange> list = map.get(clusterId);
@@ -298,7 +302,7 @@ public class Revision {
newList = new ArrayList<RevisionRange>();
} else {
RevisionRange last = list.get(list.size() - 1);
- if (last.timestamp == timestamp) {
+ if (last.seenAt.equals(seenAt)) {
// replace existing
if (r.compareRevisionTime(last.revision) > 0) {
// but only if newer
@@ -306,10 +310,13 @@ public class Revision {
}
return;
}
+ if (last.revision.compareRevisionTime(r) > 0) {
+ throw new IllegalArgumentException("Can not add an earlier revision");
+ }
newList = new ArrayList<RevisionRange>(list);
}
RevisionRange range = new RevisionRange();
- range.timestamp = timestamp;
+ range.seenAt = seenAt;
range.revision = r;
newList.add(range);
if (list == null) {
@@ -329,31 +336,16 @@ public class Revision {
if (o1.getClusterId() == o2.getClusterId()) {
return o1.compareRevisionTime(o2);
}
- long range1 = getRevisionRangeTimestamp(o1);
- long range2 = getRevisionRangeTimestamp(o2);
- if (range1 == 0 || range2 == 0) {
+ Revision range1 = getRevisionSeen(o1);
+ Revision range2 = getRevisionSeen(o2);
+ if (range1 == null || range2 == null) {
return o1.compareRevisionTime(o2);
}
- if (range1 != range2) {
- return range1 < range2 ? -1 : 1;
+ int comp = range1.compareRevisionTime(range2);
+ if (comp != 0) {
+ return comp;
}
- if (range1 == Long.MAX_VALUE) {
- // in this case, both must be Long.MAX_VALUE, otherwise
- // the previous check would have been true; and additionally
- // the revisions are from different cluster nodes
- if (o1.getClusterId() == currentClusterNodeId) {
- return 1;
- } else if (o2.getClusterId() == currentClusterNodeId) {
- return -1;
- }
- // both revisions are new revisions of other cluster nodes
- // (in reality this doesn't actually happen I believe)
- }
- int result = o1.compareRevisionTime(o2);
- if (result != 0) {
- return result;
- }
- return o1.getClusterId() < o2.getClusterId() ? -1 : 1;
+ return Integer.signum(o1.getClusterId() - o2.getClusterId());
}
/**
@@ -364,19 +356,19 @@ public class Revision {
* returned.
*
* @param r the revision
- * @return the timestamp, 0 if not found,
+ * @return the revision where it was seen, null if not found,
* the timestamp plus 1 second for new local revisions;
* Long.MAX_VALUE for new non-local revisions (meaning 'in the future')
*/
- private long getRevisionRangeTimestamp(Revision r) {
+ private Revision getRevisionSeen(Revision r) {
List<RevisionRange> list = map.get(r.getClusterId());
if (list == null) {
- return 0;
+ return null;
}
// search from latest backward
// (binary search could be used, but we expect most queries
// at the end of the list)
- long result = 0;
+ Revision result = null;
for (int i = list.size() - 1; i >= 0; i--) {
RevisionRange range = list.get(i);
int compare = r.compareRevisionTime(range.revision);
@@ -384,15 +376,15 @@ public class Revision {
if (i == list.size() - 1) {
// newer than the newest range
if (r.getClusterId() == currentClusterNodeId) {
- // newer than all
- return range.timestamp + 1000;
+ // newer than all others, except for FUTURE
+ return NEWEST;
}
// happenes in the future (not visible yet)
- return Long.MAX_VALUE;
+ return FUTURE;
}
break;
}
- result = range.timestamp;
+ result = range.seenAt;
}
return result;
}
Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java?rev=1480319&r1=1480318&r2=1480319&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java Wed May 8 15:17:22 2013
@@ -23,7 +23,6 @@ import org.apache.jackrabbit.mk.api.Micr
import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import com.mongodb.DB;
@@ -54,7 +53,6 @@ public class ClusterTest {
}
@Test
- @Ignore
public void openCloseOpen() {
MemoryDocumentStore ds = new MemoryDocumentStore();
MemoryBlobStore bs = new MemoryBlobStore();
@@ -71,9 +69,29 @@ public class ClusterTest {
MongoMK mk2 = builder.setClusterId(2).open();
mk2.commit("/", "+\"a\": {}", null, null);
mk2.commit("/", "-\"a\"", null, null);
-
+
+ builder = new MongoMK.Builder();
+ builder.setDocumentStore(ds).setBlobStore(bs);
+ MongoMK mk3 = builder.setClusterId(3).open();
+ mk3.commit("/", "+\"a\": {}", null, null);
+ mk3.commit("/", "-\"a\"", null, null);
+
+ builder = new MongoMK.Builder();
+ builder.setDocumentStore(ds).setBlobStore(bs);
+ MongoMK mk4 = builder.setClusterId(4).open();
+ mk4.commit("/", "+\"a\": {}", null, null);
+
+ builder = new MongoMK.Builder();
+ builder.setDocumentStore(ds).setBlobStore(bs);
+ MongoMK mk5 = builder.setClusterId(5).open();
+ mk5.commit("/", "-\"a\"", null, null);
+ mk5.commit("/", "+\"a\": {}", null, null);
+
mk1.dispose();
mk2.dispose();
+ mk3.dispose();
+ mk4.dispose();
+ mk5.dispose();
}
@Test
Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java?rev=1480319&r1=1480318&r2=1480319&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java Wed May 8 15:17:22 2013
@@ -134,27 +134,27 @@ public class RevisionTest {
assertEquals(-1, comp.compare(r3c1, r3c2));
// now we declare r2+r3 of c1 to be after r2+r3 of c2
- comp.add(r2c1, 20);
- comp.add(r2c2, 10);
+ comp.add(r2c1, new Revision(0x20, 0, 0));
+ comp.add(r2c2, new Revision(0x10, 0, 0));
assertEquals(
- "1:\n r120-0-1:20\n" +
- "2:\n r200-0-2:10\n", comp.toString());
+ "1:\n r120-0-1:r20-0-0\n" +
+ "2:\n r200-0-2:r10-0-0\n", comp.toString());
assertEquals(1, comp.compare(r1c1, r1c2));
assertEquals(1, comp.compare(r2c1, r2c2));
- // r3c2 is still "in the future"
+ // both r3cx are still "in the future"
assertEquals(-1, comp.compare(r3c1, r3c2));
// now we declare r3 of c1 to be before r3 of c2
// (with the same range timestamp,
// the revision timestamps are compared)
- comp.add(r3c1, 30);
- comp.add(r3c2, 30);
+ comp.add(r3c1, new Revision(0x30, 0, 0));
+ comp.add(r3c2, new Revision(0x30, 0, 0));
assertEquals(
- "1:\n r120-0-1:20 r130-0-1:30\n" +
- "2:\n r200-0-2:10 r300-0-2:30\n", comp.toString());
+ "1:\n r120-0-1:r20-0-0 r130-0-1:r30-0-0\n" +
+ "2:\n r200-0-2:r10-0-0 r300-0-2:r30-0-0\n", comp.toString());
assertEquals(1, comp.compare(r1c1, r1c2));
assertEquals(1, comp.compare(r2c1, r2c2));
@@ -165,22 +165,22 @@ public class RevisionTest {
assertEquals(1, comp.compare(r3c2, r3c1));
// get rid of old timestamps
- comp.purge(10);
+ comp.purge(0x10);
assertEquals(
- "1:\n r120-0-1:20 r130-0-1:30\n" +
- "2:\n r300-0-2:30\n", comp.toString());
- comp.purge(20);
+ "1:\n r120-0-1:r20-0-0 r130-0-1:r30-0-0\n" +
+ "2:\n r300-0-2:r30-0-0\n", comp.toString());
+ comp.purge(0x20);
assertEquals(
- "1:\n r130-0-1:30\n" +
- "2:\n r300-0-2:30\n", comp.toString());
+ "1:\n r130-0-1:r30-0-0\n" +
+ "2:\n r300-0-2:r30-0-0\n", comp.toString());
// update an entry
- comp.add(new Revision(0x301, 1, 2), 30);
+ comp.add(new Revision(0x301, 1, 2), new Revision(0x30, 0, 0));
assertEquals(
- "1:\n r130-0-1:30\n" +
- "2:\n r301-1-2:30\n", comp.toString());
+ "1:\n r130-0-1:r30-0-0\n" +
+ "2:\n r301-1-2:r30-0-0\n", comp.toString());
- comp.purge(30);
+ comp.purge(0x30);
assertEquals("", comp.toString());
}