You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2014/04/02 08:14:21 UTC

svn commit: r1583893 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java

Author: chetanm
Date: Wed Apr  2 06:14:21 2014
New Revision: 1583893

URL: http://svn.apache.org/r1583893
Log:
OAK-1295 - Recovery for missing _lastRev updates (WIP)

Refactored to use new methods

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1583893&r1=1583892&r2=1583893&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Wed Apr  2 06:14:21 2014
@@ -23,8 +23,6 @@ import static com.google.common.collect.
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.mergeSorted;
 
-import java.io.Closeable;
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
@@ -37,6 +35,7 @@ import com.google.common.collect.Maps;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoMissingLastRevSeeker;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,12 +71,12 @@ public class LastRevRecoveryAgent {
         ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
 
         if (nodeInfo != null) {
-            Long leaseEnd = (Long) (nodeInfo.get(ClusterNodeInfo.LEASE_END_KEY));
+            long leaseEnd = nodeInfo.getLeaseEndTime();
 
             // Check if _lastRev recovery needed for this cluster node
             // state == null && recoveryLock not held by someone
-            if (nodeInfo.get(ClusterNodeInfo.STATE) != null
-                    && nodeInfo.get(ClusterNodeInfo.REV_RECOVERY_LOCK) == null) {
+            if (nodeInfo.isActive()
+                    && !nodeInfo.isBeingRecovered()) {
 
                 // retrieve the root document's _lastRev
                 NodeDocument root = missingLastRevUtil.getRoot();
@@ -89,14 +88,15 @@ public class LastRevRecoveryAgent {
                 // Endtime is the leaseEnd + the asyncDelay
                 long endTime = leaseEnd + nodeStore.getAsyncDelay();
 
-                log.info("Recovering candidates modified in time range : {0}",
-                        new Object[] {startTime, endTime});
+                log.info("Recovering candidates modified in time range : [{},{}] for clusterId [{}]",
+                        Utils.timestampToString(startTime),
+                        Utils.timestampToString(endTime), clusterId);
 
                 return recoverCandidates(clusterId, startTime, endTime);
             }
         }
 
-        log.info("No recovery needed for clusterId");
+        log.debug("No recovery needed for clusterId {}", clusterId);
         return 0;
     }
 
@@ -193,27 +193,25 @@ public class LastRevRecoveryAgent {
      * @return the int the number of restored nodes
      */
     private int recoverCandidates(final int clusterId, final long startTime, final long endTime) {
-        // take a lock on the update process by setting the value of the lock to true
-        updateRecoveryLockOnCluster(clusterId, ClusterNodeInfo.REV_RECOVERY_ON);
+        boolean lockAcquired = missingLastRevUtil.acquireRecoveryLock(clusterId);
 
-        Iterable<NodeDocument> suspects =
-                missingLastRevUtil.getCandidates(startTime, endTime);
-        if (log.isDebugEnabled()) {
-            log.debug("_lastRev recovery candidates : {}", suspects);
+        //TODO What if recovery is being performed for current clusterNode by some other node
+        //should we halt the startup
+        if(!lockAcquired){
+            log.info("Last revision recovery already being performed by some other node. " +
+                    "Would not attempt recovery");
+            return 0;
         }
 
+        Iterable<NodeDocument> suspects = missingLastRevUtil.getCandidates(startTime, endTime);
+        log.debug("Performing Last Revision recovery for cluster {}", clusterId);
+
         try {
             return recover(suspects.iterator(), clusterId);
         } finally {
-            if (suspects instanceof Closeable) {
-                try {
-                    ((Closeable) suspects).close();
-                } catch (IOException e) {
-                    log.error("Error closing iterable : ", e);
-                }
-            }
+            Utils.closeIfCloseable(suspects);
             // Relinquish the lock on the recovery for the cluster on the clusterInfo
-            updateRecoveryLockOnCluster(clusterId, null);
+            missingLastRevUtil.releaseRecoveryLock(clusterId);
         }
     }
 
@@ -263,18 +261,6 @@ public class LastRevRecoveryAgent {
         return null;
     }
 
-    /**
-     * Set/Unset lock value on the clusterInfo for the clusterId
-     * 
-     * @param clusterId for which _lastRev recovery operation performed
-     * @param value the lock value
-     */
-    protected void updateRecoveryLockOnCluster(int clusterId, String value) {
-        UpdateOp update = new UpdateOp("" + clusterId, true);
-        update.set(ClusterNodeInfo.REV_RECOVERY_LOCK, value);
-        nodeStore.getDocumentStore().createOrUpdate(Collection.CLUSTER_NODES, update);
-    }
-
     private static class ClusterPredicate implements Predicate<Revision> {
         private final int clusterId;