You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2014/04/02 13:10:52 UTC
svn commit: r1583966 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
main/java/org/apache/jackrabbit/oak/plugins/document/mongo/
test/java/org/apache/jackrabbit/oak/plugins/document/
Author: chetanm
Date: Wed Apr 2 11:10:51 2014
New Revision: 1583966
URL: http://svn.apache.org/r1583966
Log:
OAK-1295 - Recovery for missing _lastRev updates (WIP)
Applying patch from Amit Jain with some changes
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Wed Apr 2 11:10:51 2014
@@ -191,16 +191,30 @@ public class ClusterNodeInfo {
*/
private RecoverLockState revRecoveryLock;
- ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, ClusterNodeState state,
- RecoverLockState revRecoveryLock) {
+ /**
+ * In memory flag indicating that this ClusterNode is entry is new and is being added to
+ * DocumentStore for the first time
+ *
+ * If false then it indicates that a previous entry for current node existed and that is being
+ * reused
+ */
+ private boolean newEntry;
+
+ private ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, ClusterNodeState state,
+ RecoverLockState revRecoveryLock, Long leaseEnd, boolean newEntry) {
this.id = id;
this.startTime = getCurrentTime();
- this.leaseEndTime = startTime;
+ if (leaseEnd == null) {
+ this.leaseEndTime = startTime;
+ } else {
+ this.leaseEndTime = leaseEnd;
+ }
this.store = store;
this.machineId = machineId;
this.instanceId = instanceId;
this.state = state;
this.revRecoveryLock = revRecoveryLock;
+ this.newEntry = newEntry;
}
public int getId() {
@@ -214,7 +228,7 @@ public class ClusterNodeInfo {
* @return the cluster node info
*/
public static ClusterNodeInfo getInstance(DocumentStore store) {
- return getInstance(store, MACHINE_ID, WORKING_DIR);
+ return getInstance(store, MACHINE_ID, WORKING_DIR, false);
}
/**
@@ -227,6 +241,20 @@ public class ClusterNodeInfo {
*/
public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
String instanceId) {
+ return getInstance(store, machineId, instanceId, true);
+ }
+
+ /**
+ * Create a cluster node info instance for the store.
+ *
+ * @param store the document store (for the lease)
+ * @param machineId the machine id (null for MAC address)
+ * @param instanceId the instance id (null for current working directory)
+ * @param updateLease whether to update the lease
+ * @return the cluster node info
+ */
+ public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
+ String instanceId, boolean updateLease) {
if (machineId == null) {
machineId = MACHINE_ID;
}
@@ -239,13 +267,27 @@ public class ClusterNodeInfo {
update.set(ID, String.valueOf(clusterNode.id));
update.set(MACHINE_ID_KEY, clusterNode.machineId);
update.set(INSTANCE_ID_KEY, clusterNode.instanceId);
- update.set(LEASE_END_KEY, getCurrentTime() + clusterNode.leaseTime);
+ if (updateLease) {
+ update.set(LEASE_END_KEY, getCurrentTime() + clusterNode.leaseTime);
+ } else {
+ update.set(LEASE_END_KEY, clusterNode.leaseEndTime);
+ }
update.set(INFO_KEY, clusterNode.toString());
update.set(STATE, clusterNode.state.name());
update.set(REV_RECOVERY_LOCK, clusterNode.revRecoveryLock.name());
- boolean success =
- store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
- if (success) {
+
+ final boolean success;
+ if (clusterNode.newEntry) {
+ //For new entry do a create. This ensures that if two nodes create
+ //entry with same id then only one would succeed
+ success = store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
+ } else {
+ // No expiration of earlier cluster info, so update
+ store.createOrUpdate(Collection.CLUSTER_NODES, update);
+ success = true;
+ }
+
+ if(success){
return clusterNode;
}
}
@@ -262,6 +304,8 @@ public class ClusterNodeInfo {
int clusterNodeId = 0;
int maxId = 0;
ClusterNodeState state = ClusterNodeState.NONE;
+ Long prevLeaseEnd = null;
+ boolean newEntry = false;
for (Document doc : list) {
String key = doc.getId();
int id;
@@ -288,18 +332,28 @@ public class ClusterNodeInfo {
// a different machine or instance
continue;
}
- // remove expired matching entries
- store.remove(Collection.CLUSTER_NODES, key);
+
+ //and cluster node which matches current machine identity but
+ //not being used
if (clusterNodeId == 0 || id < clusterNodeId) {
// if there are multiple, use the smallest value
clusterNodeId = id;
state = ClusterNodeState.fromString((String) doc.get(STATE));
+ prevLeaseEnd = leaseEnd;
}
}
+
+ //No existing entry with matching signature found so
+ //create a new entry
if (clusterNodeId == 0) {
clusterNodeId = maxId + 1;
+ newEntry = true;
}
- return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state, RecoverLockState.NONE);
+
+ // Do not expire entries and stick on the earlier state, and leaseEnd so,
+ // that _lastRev recovery if needed is done.
+ return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state,
+ RecoverLockState.NONE, prevLeaseEnd, newEntry);
}
/**
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Wed Apr 2 11:10:51 2014
@@ -382,6 +382,9 @@ public final class DocumentNodeStore
"DocumentNodeStore background thread");
backgroundThread.setDaemon(true);
checkLastRevRecovery();
+ // Renew the lease because it may have been stale
+ backgroundRenewClusterIdLease();
+
backgroundThread.start();
LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}", clusterId);
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Wed Apr 2 11:10:51 2014
@@ -24,12 +24,14 @@ import static com.google.common.collect.
import static com.google.common.collect.Iterables.mergeSorted;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.CheckForNull;
import com.google.common.base.Predicate;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.jackrabbit.oak.commons.PathUtils;
@@ -74,10 +76,8 @@ public class LastRevRecoveryAgent {
long leaseEnd = nodeInfo.getLeaseEndTime();
// Check if _lastRev recovery needed for this cluster node
- // state == null && recoveryLock not held by someone
- if (nodeInfo.isActive()
- && !nodeInfo.isBeingRecovered()) {
-
+ // state is Active && recoveryLock not held by someone
+ if (isRecoveryNeeded(nodeInfo)) {
// retrieve the root document's _lastRev
NodeDocument root = missingLastRevUtil.getRoot();
Revision lastRev = root.getLastRev().get(clusterId);
@@ -260,7 +260,38 @@ public class LastRevRecoveryAgent {
}
return null;
}
-
+
+ /**
+ * Gets the _lastRev recovery candidate cluster nodes.
+ *
+ * @return the recovery candidate nodes
+ */
+ public List<String> getRecoveryCandidateNodes() {
+ Iterable<ClusterNodeInfoDocument> clusters = missingLastRevUtil.getAllClusters();
+ List<String> candidateClusterNodes = Lists.newArrayList();
+
+ for (ClusterNodeInfoDocument nodeInfo : clusters) {
+ if (isRecoveryNeeded(nodeInfo)) {
+ candidateClusterNodes.add(nodeInfo.getId());
+ }
+ }
+
+ return candidateClusterNodes;
+ }
+
+ private boolean isRecoveryNeeded(ClusterNodeInfoDocument nodeInfo) {
+ if (nodeInfo != null) {
+ // Check if _lastRev recovery needed for this cluster node
+ // state is Active && currentTime past the leaseEnd time && recoveryLock not held by someone
+ if (nodeInfo.isActive()
+ && nodeStore.getClock().getTime() > nodeInfo.getLeaseEndTime()
+ && !nodeInfo.isBeingRecovered()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private static class ClusterPredicate implements Predicate<Revision> {
private final int clusterId;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java Wed Apr 2 11:10:51 2014
@@ -39,7 +39,17 @@ public class MissingLastRevSeeker {
public MissingLastRevSeeker(DocumentStore store) {
this.store = store;
}
-
+
+ /**
+ * Gets the clusters which potentially need _lastRev recovery.
+ *
+ * @return the clusters
+ */
+ public Iterable<ClusterNodeInfoDocument> getAllClusters() {
+ return store.query(Collection.CLUSTER_NODES, ClusterNodeInfoDocument.MIN_ID_VALUE,
+ ClusterNodeInfoDocument.MAX_ID_VALUE, Integer.MAX_VALUE);
+ }
+
/**
* Gets the cluster node info for the given cluster node id.
*
@@ -48,24 +58,7 @@ public class MissingLastRevSeeker {
*/
public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
// Fetch all documents.
- List<ClusterNodeInfoDocument> nodes = store.query(Collection.CLUSTER_NODES, "0",
- "a", Integer.MAX_VALUE);
- Iterable<ClusterNodeInfoDocument> clusterIterable =
- Iterables.filter(nodes,
- new Predicate<ClusterNodeInfoDocument>() {
- // Return cluster info for the required clusterId
- @Override
- public boolean apply(ClusterNodeInfoDocument input) {
- String id = input.getId();
- return (id.equals(String.valueOf(clusterId)));
- }
- });
-
- if (clusterIterable.iterator().hasNext()) {
- return clusterIterable.iterator().next();
- }
-
- return null;
+ return store.find(Collection.CLUSTER_NODES, String.valueOf(clusterId));
}
/**
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java Wed Apr 2 11:10:51 2014
@@ -32,7 +32,6 @@ import com.mongodb.QueryBuilder;
import com.mongodb.ReadPreference;
import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo;
-import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument;
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.Commit;
import org.apache.jackrabbit.oak.plugins.document.Document;
@@ -55,24 +54,6 @@ public class MongoMissingLastRevSeeker e
}
@Override
- public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
- DBObject query =
- start(NodeDocument.ID).is(String.valueOf(clusterId)).get();
- DBCursor cursor =
- getClusterNodeCollection().find(query)
- .setReadPreference(ReadPreference.secondaryPreferred());
- try {
- if (cursor.hasNext()) {
- DBObject obj = cursor.next();
- return store.convertFromDBObject(Collection.CLUSTER_NODES, obj);
- }
- } finally {
- cursor.close();
- }
- return null;
- }
-
- @Override
public CloseableIterable<NodeDocument> getCandidates(final long startTime,
final long endTime) {
DBObject query =
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java Wed Apr 2 11:10:51 2014
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertNul
import java.util.List;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
import org.junit.Test;
import com.mongodb.ReadPreference;
@@ -38,6 +40,10 @@ public class ClusterInfoTest {
public void readWriteMode() throws InterruptedException {
MemoryDocumentStore mem = new MemoryDocumentStore();
+ Clock clock = new Clock.Virtual();
+ clock.waitUntil(System.currentTimeMillis());
+ ClusterNodeInfo.setClock(clock);
+
DocumentNodeStore ns1 = new DocumentMK.Builder().
setDocumentStore(mem).
setAsyncDelay(0).
@@ -46,8 +52,13 @@ public class ClusterInfoTest {
setDocumentStore(mem).
setAsyncDelay(0).
getNodeStore();
+ // Bring the current time forward to after the leaseTime which would have been
+ // updated in the DocumentNodeStore initialization.
+ clock.waitUntil(clock.getTime() + ns1.getClusterInfo().getLeaseTime());
+
ns1.getClusterInfo().setLeaseTime(0);
ns2.getClusterInfo().setLeaseTime(0);
+
List<ClusterNodeInfoDocument> list = mem.query(
Collection.CLUSTER_NODES, "0", "a", Integer.MAX_VALUE);
assertEquals(2, list.size());
@@ -79,4 +90,8 @@ public class ClusterInfoTest {
ns2.dispose();
}
+ @After
+ public void tearDown(){
+ ClusterNodeInfo.setClock(null);
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java Wed Apr 2 11:10:51 2014
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEqu
import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import com.google.common.collect.Lists;
@@ -100,6 +101,8 @@ public class LastRevSingleNodeRecoveryTe
@Test
public void testLastRevRestoreOnNodeStart() throws Exception {
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
+
// pending updates
setupScenario();
@@ -113,9 +116,6 @@ public class LastRevSingleNodeRecoveryTe
mk = openMK(0, mk.getNodeStore().getDocumentStore());
int pendingCount = mk.getPendingWriteCount();
-
- // so that the current time is more than the current lease end
- clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
// Immediately check again, now should not have done any changes.
LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent();
/** Now there should have been pendingCount updates **/
@@ -124,6 +124,7 @@ public class LastRevSingleNodeRecoveryTe
@Test
public void testLastRevRestore() throws Exception {
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
setupScenario();
int pendingCount = mk.getPendingWriteCount();
@@ -138,6 +139,7 @@ public class LastRevSingleNodeRecoveryTe
@Test
public void testNoMissingUpdates() throws Exception {
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
setupScenario();
mk.backgroundWrite();
@@ -158,7 +160,21 @@ public class LastRevSingleNodeRecoveryTe
/** There should have been no updates **/
assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId()));
}
+
+ @Test
+ public void testNodeRecoveryNeeded() throws InterruptedException {
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
+ setupScenario();
+
+ // so that the current time is more than the current lease end
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
+ LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent();
+ Iterator<String> iter = recoveryAgent.getRecoveryCandidateNodes().iterator();
+ assertEquals(String.valueOf(1), iter.next());
+ assertEquals(false, iter.hasNext());
+ }
+
private void setupScenario() throws InterruptedException {
// add some nodes which won't be returned
mk.commit("/", "+\"u\" : { \"v\": {}}", null, null);
@@ -210,4 +226,3 @@ public class LastRevSingleNodeRecoveryTe
fixture.dispose();
}
}
-