You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2014/04/02 13:10:52 UTC

svn commit: r1583966 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ test/java/org/apache/jackrabbit/oak/plugins/document/

Author: chetanm
Date: Wed Apr  2 11:10:51 2014
New Revision: 1583966

URL: http://svn.apache.org/r1583966
Log:
OAK-1295 - Recovery for missing _lastRev updates (WIP)

Applying patch from Amit Jain with some changes

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Wed Apr  2 11:10:51 2014
@@ -191,16 +191,30 @@ public class ClusterNodeInfo {
      */
     private RecoverLockState revRecoveryLock;
 
-    ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, ClusterNodeState state,
-                    RecoverLockState revRecoveryLock) {
+    /**
+     * In memory flag indicating that this ClusterNode is entry is new and is being added to
+     * DocumentStore for the first time
+     *
+     * If false then it indicates that a previous entry for current node existed and that is being
+     * reused
+     */
+    private boolean newEntry;
+
+    private ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, ClusterNodeState state,
+            RecoverLockState revRecoveryLock, Long leaseEnd, boolean newEntry) {
         this.id = id;
         this.startTime = getCurrentTime();
-        this.leaseEndTime = startTime;
+        if (leaseEnd == null) {
+            this.leaseEndTime = startTime;
+        } else {
+            this.leaseEndTime = leaseEnd;
+        }
         this.store = store;
         this.machineId = machineId;
         this.instanceId = instanceId;
         this.state = state;
         this.revRecoveryLock = revRecoveryLock;
+        this.newEntry = newEntry;
     }
 
     public int getId() {
@@ -214,7 +228,7 @@ public class ClusterNodeInfo {
      * @return the cluster node info
      */
     public static ClusterNodeInfo getInstance(DocumentStore store) {
-        return getInstance(store, MACHINE_ID, WORKING_DIR);
+        return getInstance(store, MACHINE_ID, WORKING_DIR, false);
     }
 
     /**
@@ -227,6 +241,20 @@ public class ClusterNodeInfo {
      */
     public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
             String instanceId) {
+        return getInstance(store, machineId, instanceId, true);
+    }
+
+    /**
+     * Create a cluster node info instance for the store.
+     *
+     * @param store the document store (for the lease)
+     * @param machineId the machine id (null for MAC address)
+     * @param instanceId the instance id (null for current working directory)
+     * @param updateLease whether to update the lease
+     * @return the cluster node info
+     */
+    public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
+            String instanceId, boolean updateLease) {
         if (machineId == null) {
             machineId = MACHINE_ID;
         }
@@ -239,13 +267,27 @@ public class ClusterNodeInfo {
             update.set(ID, String.valueOf(clusterNode.id));
             update.set(MACHINE_ID_KEY, clusterNode.machineId);
             update.set(INSTANCE_ID_KEY, clusterNode.instanceId);
-            update.set(LEASE_END_KEY, getCurrentTime() + clusterNode.leaseTime);
+            if (updateLease) {
+                update.set(LEASE_END_KEY, getCurrentTime() + clusterNode.leaseTime);
+            } else {
+                update.set(LEASE_END_KEY, clusterNode.leaseEndTime);
+            }
             update.set(INFO_KEY, clusterNode.toString());
             update.set(STATE, clusterNode.state.name());
             update.set(REV_RECOVERY_LOCK, clusterNode.revRecoveryLock.name());
-            boolean success =
-                    store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
-            if (success) {
+
+            final boolean success;
+            if (clusterNode.newEntry) {
+                //For new entry do a create. This ensures that if two nodes create
+                //entry with same id then only one would succeed
+                success = store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
+            } else {
+                // No expiration of earlier cluster info, so update
+                store.createOrUpdate(Collection.CLUSTER_NODES, update);
+                success = true;
+            }
+
+            if(success){
                 return clusterNode;
             }
         }
@@ -262,6 +304,8 @@ public class ClusterNodeInfo {
         int clusterNodeId = 0;
         int maxId = 0;
         ClusterNodeState state = ClusterNodeState.NONE;
+        Long prevLeaseEnd = null;
+        boolean newEntry = false;
         for (Document doc : list) {
             String key = doc.getId();
             int id;
@@ -288,18 +332,28 @@ public class ClusterNodeInfo {
                 // a different machine or instance
                 continue;
             }
-            // remove expired matching entries
-            store.remove(Collection.CLUSTER_NODES, key);
+
+            //and cluster node which matches current machine identity but
+            //not being used
             if (clusterNodeId == 0 || id < clusterNodeId) {
                 // if there are multiple, use the smallest value
                 clusterNodeId = id;
                 state = ClusterNodeState.fromString((String) doc.get(STATE));
+                prevLeaseEnd = leaseEnd;
             }
         }
+
+        //No existing entry with matching signature found so
+        //create a new entry
         if (clusterNodeId == 0) {
             clusterNodeId = maxId + 1;
+            newEntry = true;
         }
-        return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state, RecoverLockState.NONE);
+
+        // Do not expire entries and stick on the earlier state, and leaseEnd so,
+        // that _lastRev recovery if needed is done.        
+        return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state, 
+                RecoverLockState.NONE, prevLeaseEnd, newEntry);
     }
 
     /**

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Wed Apr  2 11:10:51 2014
@@ -382,6 +382,9 @@ public final class DocumentNodeStore
                 "DocumentNodeStore background thread");
         backgroundThread.setDaemon(true);
         checkLastRevRecovery();
+        // Renew the lease because it may have been stale
+        backgroundRenewClusterIdLease();
+
         backgroundThread.start();
 
         LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}", clusterId);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Wed Apr  2 11:10:51 2014
@@ -24,12 +24,14 @@ import static com.google.common.collect.
 import static com.google.common.collect.Iterables.mergeSorted;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.annotation.CheckForNull;
 
 import com.google.common.base.Predicate;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import org.apache.jackrabbit.oak.commons.PathUtils;
@@ -74,10 +76,8 @@ public class LastRevRecoveryAgent {
             long leaseEnd = nodeInfo.getLeaseEndTime();
 
             // Check if _lastRev recovery needed for this cluster node
-            // state == null && recoveryLock not held by someone
-            if (nodeInfo.isActive()
-                    && !nodeInfo.isBeingRecovered()) {
-
+            // state is Active && recoveryLock not held by someone
+            if (isRecoveryNeeded(nodeInfo)) {            
                 // retrieve the root document's _lastRev
                 NodeDocument root = missingLastRevUtil.getRoot();
                 Revision lastRev = root.getLastRev().get(clusterId);
@@ -260,7 +260,38 @@ public class LastRevRecoveryAgent {
         }
         return null;
     }
-
+    
+    /**
+     * Gets the _lastRev recovery candidate cluster nodes.
+     *
+     * @return the recovery candidate nodes
+     */
+    public List<String> getRecoveryCandidateNodes() {
+        Iterable<ClusterNodeInfoDocument> clusters = missingLastRevUtil.getAllClusters();
+        List<String> candidateClusterNodes = Lists.newArrayList();
+        
+        for (ClusterNodeInfoDocument nodeInfo : clusters) {
+            if (isRecoveryNeeded(nodeInfo)) {
+                candidateClusterNodes.add(nodeInfo.getId());
+            }
+        }
+        
+        return candidateClusterNodes;
+    }
+    
+    private boolean isRecoveryNeeded(ClusterNodeInfoDocument nodeInfo) {
+        if (nodeInfo != null) {
+            // Check if _lastRev recovery needed for this cluster node
+            // state is Active && currentTime past the leaseEnd time && recoveryLock not held by someone
+            if (nodeInfo.isActive()
+                    && nodeStore.getClock().getTime() > nodeInfo.getLeaseEndTime()
+                    && !nodeInfo.isBeingRecovered()) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
     private static class ClusterPredicate implements Predicate<Revision> {
         private final int clusterId;
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java Wed Apr  2 11:10:51 2014
@@ -39,7 +39,17 @@ public class MissingLastRevSeeker {
     public MissingLastRevSeeker(DocumentStore store) {
         this.store = store;
     }
-
+    
+    /**
+     * Gets the clusters which potentially need _lastRev recovery.
+     *
+     * @return the clusters
+     */
+    public Iterable<ClusterNodeInfoDocument> getAllClusters() {
+        return store.query(Collection.CLUSTER_NODES, ClusterNodeInfoDocument.MIN_ID_VALUE,
+                ClusterNodeInfoDocument.MAX_ID_VALUE, Integer.MAX_VALUE);
+    }
+    
     /**
      * Gets the cluster node info for the given cluster node id.
      *
@@ -48,24 +58,7 @@ public class MissingLastRevSeeker {
      */
     public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
         // Fetch all documents.
-        List<ClusterNodeInfoDocument> nodes = store.query(Collection.CLUSTER_NODES, "0",
-                "a", Integer.MAX_VALUE);
-        Iterable<ClusterNodeInfoDocument> clusterIterable =
-                Iterables.filter(nodes,
-                        new Predicate<ClusterNodeInfoDocument>() {
-                            // Return cluster info for the required clusterId
-                            @Override
-                            public boolean apply(ClusterNodeInfoDocument input) {
-                                String id = input.getId();
-                                return (id.equals(String.valueOf(clusterId)));
-                            }
-                        });
-
-        if (clusterIterable.iterator().hasNext()) {
-            return clusterIterable.iterator().next();
-        }
-
-        return null;
+        return store.find(Collection.CLUSTER_NODES, String.valueOf(clusterId));
     }
 
     /**

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java Wed Apr  2 11:10:51 2014
@@ -32,7 +32,6 @@ import com.mongodb.QueryBuilder;
 import com.mongodb.ReadPreference;
 
 import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo;
-import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument;
 import org.apache.jackrabbit.oak.plugins.document.Collection;
 import org.apache.jackrabbit.oak.plugins.document.Commit;
 import org.apache.jackrabbit.oak.plugins.document.Document;
@@ -55,24 +54,6 @@ public class MongoMissingLastRevSeeker e
     }
 
     @Override
-    public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
-        DBObject query =
-                start(NodeDocument.ID).is(String.valueOf(clusterId)).get();
-        DBCursor cursor =
-                getClusterNodeCollection().find(query)
-                        .setReadPreference(ReadPreference.secondaryPreferred());
-        try {
-            if (cursor.hasNext()) {
-                DBObject obj = cursor.next();
-                return store.convertFromDBObject(Collection.CLUSTER_NODES, obj);
-            }
-        } finally {
-            cursor.close();
-        }
-        return null;
-    }
-
-    @Override
     public CloseableIterable<NodeDocument> getCandidates(final long startTime,
             final long endTime) {
         DBObject query =

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java Wed Apr  2 11:10:51 2014
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertNul
 import java.util.List;
 
 import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
 import org.junit.Test;
 
 import com.mongodb.ReadPreference;
@@ -38,6 +40,10 @@ public class ClusterInfoTest {
     public void readWriteMode() throws InterruptedException {
 
         MemoryDocumentStore mem = new MemoryDocumentStore();
+        Clock clock = new Clock.Virtual();
+        clock.waitUntil(System.currentTimeMillis());
+        ClusterNodeInfo.setClock(clock);
+
         DocumentNodeStore ns1 = new DocumentMK.Builder().
                 setDocumentStore(mem).
                 setAsyncDelay(0).
@@ -46,8 +52,13 @@ public class ClusterInfoTest {
                 setDocumentStore(mem).
                 setAsyncDelay(0).
                 getNodeStore();
+        // Bring the current time forward to after the leaseTime which would have been 
+        // updated in the DocumentNodeStore initialization.
+        clock.waitUntil(clock.getTime() + ns1.getClusterInfo().getLeaseTime());
+
         ns1.getClusterInfo().setLeaseTime(0);
         ns2.getClusterInfo().setLeaseTime(0);
+
         List<ClusterNodeInfoDocument> list = mem.query(
                 Collection.CLUSTER_NODES, "0", "a", Integer.MAX_VALUE);
         assertEquals(2, list.size());
@@ -79,4 +90,8 @@ public class ClusterInfoTest {
         ns2.dispose();
     }
 
+    @After
+    public void tearDown(){
+        ClusterNodeInfo.setClock(null);
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java?rev=1583966&r1=1583965&r2=1583966&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java Wed Apr  2 11:10:51 2014
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEqu
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.collect.Lists;
@@ -100,6 +101,8 @@ public class LastRevSingleNodeRecoveryTe
 
     @Test
     public void testLastRevRestoreOnNodeStart() throws Exception {
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
+
         // pending updates
         setupScenario();
 
@@ -113,9 +116,6 @@ public class LastRevSingleNodeRecoveryTe
         mk = openMK(0, mk.getNodeStore().getDocumentStore());
 
         int pendingCount = mk.getPendingWriteCount();
-
-        // so that the current time is more than the current lease end
-        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
         // Immediately check again, now should not have done any changes.
         LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent();
         /** Now there should have been pendingCount updates **/
@@ -124,6 +124,7 @@ public class LastRevSingleNodeRecoveryTe
 
     @Test
     public void testLastRevRestore() throws Exception {
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
         setupScenario();
 
         int pendingCount = mk.getPendingWriteCount();
@@ -138,6 +139,7 @@ public class LastRevSingleNodeRecoveryTe
 
     @Test
     public void testNoMissingUpdates() throws Exception {
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
         setupScenario();
         mk.backgroundWrite();
 
@@ -158,7 +160,21 @@ public class LastRevSingleNodeRecoveryTe
         /** There should have been no updates **/
         assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId()));
     }
+    
+    @Test
+    public void testNodeRecoveryNeeded() throws InterruptedException {
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
+        setupScenario();
+
+        // so that the current time is more than the current lease end
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
 
+        LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent();
+        Iterator<String> iter = recoveryAgent.getRecoveryCandidateNodes().iterator();
+        assertEquals(String.valueOf(1), iter.next());
+        assertEquals(false, iter.hasNext());
+    }
+    
     private void setupScenario() throws InterruptedException {
         // add some nodes which won't be returned
         mk.commit("/", "+\"u\" : { \"v\": {}}", null, null);
@@ -210,4 +226,3 @@ public class LastRevSingleNodeRecoveryTe
         fixture.dispose();
     }
 }
-