You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2018/07/03 15:25:39 UTC

svn commit: r1834986 [1/2] - in /jackrabbit/oak/trunk: oak-run/src/main/java/org/apache/jackrabbit/oak/run/ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/...

Author: mreutegg
Date: Tue Jul  3 15:25:38 2018
New Revision: 1834986

URL: http://svn.apache.org/viewvc?rev=1834986&view=rev
Log:
OAK-7316: Greedy ClusterNodeInfo

Added:
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLock.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparatorTest.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerTest.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLockTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BaseDocumentDiscoveryLiteServiceTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FormatVersionTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RandomDocumentNodeStoreSweepTest.java

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java Tue Jul  3 15:25:38 2018
@@ -50,7 +50,7 @@ class RecoveryCommand implements Command
                 System.exit(1);
             }
             MongoDocumentStore docStore = (MongoDocumentStore) dns.getDocumentStore();
-            LastRevRecoveryAgent agent = new LastRevRecoveryAgent(dns);
+            LastRevRecoveryAgent agent = new LastRevRecoveryAgent(docStore, dns);
             MongoMissingLastRevSeeker seeker = new MongoMissingLastRevSeeker(
                     docStore, dns.getClock());
             CloseableIterable<NodeDocument> docs = seeker.getCandidates(0);

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Tue Jul  3 15:25:38 2018
@@ -18,17 +18,23 @@ package org.apache.jackrabbit.oak.plugin
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.ACTIVE;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.NONE;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState.ACQUIRED;
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getModuleVersion;
 
 import java.lang.management.ManagementFactory;
 import java.net.NetworkInterface;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -161,7 +167,7 @@ public class ClusterNodeInfo {
     /**
      * The unique machine id (the MAC address if available).
      */
-    private static final String MACHINE_ID = getMachineId();
+    private static final String MACHINE_ID = getHardwareMachineId();
 
     /**
      * The process id (if available).
@@ -296,7 +302,7 @@ public class ClusterNodeInfo {
     /**
      * The state of the cluster node.
      */
-    private ClusterNodeState state;
+    private ClusterNodeState state = ACTIVE;
 
     /**
      * OAK-2739 / OAK-3397 : once a lease check turns out negative, this flag
@@ -314,17 +320,6 @@ public class ClusterNodeInfo {
     private boolean leaseCheckDisabled;
 
     /**
-     * Tracks the fact whether the lease has *ever* been renewed by this instance
-     * or has just be read from the document store at initialization time.
-     */
-    private boolean renewed;
-
-    /**
-     * The revLock value of the cluster;
-     */
-    private RecoverLockState revRecoveryLock;
-
-    /**
      * In memory flag indicating that this ClusterNode is entry is new and is being added to
      * DocumentStore for the first time
      *
@@ -339,21 +334,15 @@ public class ClusterNodeInfo {
      */
     private LeaseFailureHandler leaseFailureHandler;
 
-    private ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, ClusterNodeState state,
-            RecoverLockState revRecoveryLock, Long leaseEnd, boolean newEntry) {
+    private ClusterNodeInfo(int id, DocumentStore store, String machineId,
+                            String instanceId, boolean newEntry) {
         this.id = id;
         this.startTime = getCurrentTime();
-        if (leaseEnd == null) {
-            this.leaseEndTime = startTime;
-        } else {
-            this.leaseEndTime = leaseEnd;
-        }
-        this.renewed = false; // will be updated once we renew it the first time
+        this.leaseEndTime = this.startTime +leaseTime;
+        this.previousLeaseEndTime = this.leaseEndTime;
         this.store = store;
         this.machineId = machineId;
         this.instanceId = instanceId;
-        this.state = state;
-        this.revRecoveryLock = revRecoveryLock;
         this.newEntry = newEntry;
         this.leaseCheckDisabled = DEFAULT_LEASE_CHECK_DISABLED;
     }
@@ -366,20 +355,29 @@ public class ClusterNodeInfo {
         return id;
     }
 
+    String getMachineId() {
+        return machineId;
+    }
+
+    String getInstanceId() {
+        return instanceId;
+    }
+
     /**
-     * Create a dummy cluster node info instance to be utilized for read only access to underlying store.
-     * @param store
+     * Create a cluster node info instance to be utilized for read only access
+     * to underlying store.
+     *
+     * @param store the document store.
      * @return the cluster node info
      */
     public static ClusterNodeInfo getReadOnlyInstance(DocumentStore store) {
-        return new ClusterNodeInfo(0, store, MACHINE_ID, WORKING_DIR, ACTIVE,
-                RecoverLockState.NONE, null, true) {
+        return new ClusterNodeInfo(0, store, MACHINE_ID, WORKING_DIR, true) {
             @Override
             public void dispose() {
             }
 
             @Override
-            public long getLeaseTime() {
+            public long getLeaseEndTime() {
                 return Long.MAX_VALUE;
             }
 
@@ -401,42 +399,21 @@ public class ClusterNodeInfo {
     }
 
     /**
-     * Create a cluster node info instance for the store, with the
-     *
-     * @param store the document store (for the lease)
-     * @param configuredClusterId the configured cluster id (or 0 for dynamic assignment)
-     * @return the cluster node info
-     */
-    public static ClusterNodeInfo getInstance(DocumentStore store, int configuredClusterId) {
-        return getInstance(store, MACHINE_ID, WORKING_DIR, configuredClusterId, false);
-    }
-
-    /**
-     * Create a cluster node info instance for the store.
-     *
-     * @param store the document store (for the lease)
-     * @param machineId the machine id (null for MAC address)
-     * @param instanceId the instance id (null for current working directory)
-     * @return the cluster node info
-     */
-    public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
-            String instanceId) {
-        return getInstance(store, machineId, instanceId, 0, true);
-    }
-
-    /**
-     * Create a cluster node info instance for the store.
+     * Get or create a cluster node info instance for the store.
      *
      * @param store the document store (for the lease)
+     * @param recoveryHandler the recovery handler to call for a clusterId with
+     *                        an expired lease.
      * @param machineId the machine id (null for MAC address)
      * @param instanceId the instance id (null for current working directory)
      * @param configuredClusterId the configured cluster id (or 0 for dynamic assignment)
-     * @param updateLease whether to update the lease
      * @return the cluster node info
      */
-    public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
-            String instanceId, int configuredClusterId, boolean updateLease) {
-
+    public static ClusterNodeInfo getInstance(DocumentStore store,
+                                              RecoveryHandler recoveryHandler,
+                                              String machineId,
+                                              String instanceId,
+                                              int configuredClusterId) {
         // defaults for machineId and instanceID
         if (machineId == null) {
             machineId = MACHINE_ID;
@@ -447,54 +424,70 @@ public class ClusterNodeInfo {
 
         int retries = 10;
         for (int i = 0; i < retries; i++) {
-            ClusterNodeInfo clusterNode = createInstance(store, machineId, instanceId, configuredClusterId, i == 0);
+            Map.Entry<ClusterNodeInfo, Long> suggestedClusterNode =
+                    createInstance(store, recoveryHandler, machineId,
+                            instanceId, configuredClusterId, i == 0);
+            ClusterNodeInfo clusterNode = suggestedClusterNode.getKey();
+            Long currentStartTime = suggestedClusterNode.getValue();
             String key = String.valueOf(clusterNode.id);
-            UpdateOp update = new UpdateOp(key, true);
+            UpdateOp update = new UpdateOp(key, clusterNode.newEntry);
             update.set(MACHINE_ID_KEY, clusterNode.machineId);
             update.set(INSTANCE_ID_KEY, clusterNode.instanceId);
-            if (updateLease) {
-                update.set(LEASE_END_KEY, getCurrentTime() + clusterNode.leaseTime);
-            } else {
-                update.set(LEASE_END_KEY, clusterNode.leaseEndTime);
-            }
+            update.set(LEASE_END_KEY, clusterNode.leaseEndTime);
             update.set(START_TIME_KEY, clusterNode.startTime);
             update.set(INFO_KEY, clusterNode.toString());
-            update.set(STATE, clusterNode.state.name());
-            update.set(REV_RECOVERY_LOCK, clusterNode.revRecoveryLock.name());
+            update.set(STATE, ACTIVE.name());
             update.set(OAK_VERSION_KEY, OAK_VERSION);
 
+            ClusterNodeInfoDocument before = null;
             final boolean success;
             if (clusterNode.newEntry) {
                 // For new entry do a create. This ensures that if two nodes
                 // create entry with same id then only one would succeed
                 success = store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
             } else {
-                // No expiration of earlier cluster info, so update
-                store.createOrUpdate(Collection.CLUSTER_NODES, update);
-                success = true;
+                // remember how the entry looked before the update
+                before = store.find(Collection.CLUSTER_NODES, key);
+
+                // perform a conditional update with a check on the startTime
+                // field. If there are competing cluster nodes trying to acquire
+                // the same inactive clusterId, only one of them will succeed.
+                update.equals(START_TIME_KEY, currentStartTime);
+                // ensure some other conditions
+                // 1) must not be active
+                update.notEquals(STATE, ACTIVE.name());
+                // 2) must not have a recovery lock
+                update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
+
+                success = store.findAndUpdate(Collection.CLUSTER_NODES, update) != null;
             }
 
             if (success) {
+                logClusterIdAcquired(clusterNode, before);
                 return clusterNode;
             }
+            LOG.info("Collision while acquiring clusterId {}. Retrying...",
+                    clusterNode.getId());
         }
-        throw new DocumentStoreException("Could not get cluster node info (retried " + retries + " times)");
+        throw new DocumentStoreException("Could not get cluster node info (tried " + retries + " times)");
     }
 
-    private static ClusterNodeInfo createInstance(DocumentStore store, String machineId,
-            String instanceId, int configuredClusterId, boolean waitForLease) {
+    private static Map.Entry<ClusterNodeInfo, Long> createInstance(DocumentStore store,
+                                                                   RecoveryHandler recoveryHandler,
+                                                                   String machineId,
+                                                                   String instanceId,
+                                                                   int configuredClusterId,
+                                                                   boolean waitForLease) {
 
         long now = getCurrentTime();
-        int clusterNodeId = 0;
         int maxId = 0;
-        ClusterNodeState state = ClusterNodeState.NONE;
-        RecoverLockState lockState = RecoverLockState.NONE;
-        Long prevLeaseEnd = null;
-        boolean newEntry = false;
 
         ClusterNodeInfoDocument alreadyExistingConfigured = null;
         String reuseFailureReason = "";
         List<ClusterNodeInfoDocument> list = ClusterNodeInfoDocument.all(store);
+        Map<Integer, Long> startTimes = new HashMap<>();
+        SortedSet<ClusterNodeInfo> candidates = new TreeSet<>(
+                new ClusterNodeInfoComparator(machineId, instanceId));
 
         for (ClusterNodeInfoDocument doc : list) {
 
@@ -510,6 +503,13 @@ public class ClusterNodeInfo {
 
             maxId = Math.max(maxId, id);
 
+            // cannot use an entry without start time
+            if (doc.getStartTime() == -1) {
+                reuseFailureReason = reject(id,
+                        "Cluster node entry does not have a startTime. ");
+                continue;
+            }
+
             // if a cluster id was configured: check that and abort if it does
             // not match
             if (configuredClusterId != 0) {
@@ -524,7 +524,11 @@ public class ClusterNodeInfo {
             String mId = "" + doc.get(MACHINE_ID_KEY);
             String iId = "" + doc.get(INSTANCE_ID_KEY);
 
-            if (leaseEnd != null && leaseEnd > now) {
+            // handle active clusterId with valid lease and no recovery lock
+            // -> potentially wait for lease if machine and instance id match
+            if (leaseEnd != null
+                    && leaseEnd > now
+                    && !doc.isRecoveryNeeded(now)) {
                 // wait if (a) instructed to, and (b) also the remaining time
                 // time is not much bigger than the lease interval (in which
                 // case something is very very wrong anyway)
@@ -532,46 +536,48 @@ public class ClusterNodeInfo {
                         && iId.equals(instanceId)) {
                     boolean worthRetrying = waitForLeaseExpiry(store, doc, leaseEnd, machineId, instanceId);
                     if (worthRetrying) {
-                        return createInstance(store, machineId, instanceId, configuredClusterId, false);
+                        return createInstance(store, recoveryHandler, machineId, instanceId, configuredClusterId, false);
                     }
                 }
 
-                reuseFailureReason = "leaseEnd " + leaseEnd + " > " + now + " - " + (leaseEnd - now) + "ms in the future";
+                reuseFailureReason = reject(id,
+                        "leaseEnd " + leaseEnd + " > " + now + " - " + (leaseEnd - now) + "ms in the future");
                 continue;
             }
 
-            // remove entries with "random:" keys if not in use (no lease at all) 
-            if (mId.startsWith(RANDOM_PREFIX) && leaseEnd == null) {
-                store.remove(Collection.CLUSTER_NODES, key);
-                LOG.debug("Cleaned up cluster node info for clusterNodeId {} [machineId: {}, leaseEnd: n/a]", id, mId);
-                if (alreadyExistingConfigured == doc) {
-                    // we removed it, so we can't re-use it after all
-                    alreadyExistingConfigured = null;
+            // if we get here the clusterId either:
+            // 1) is inactive
+            // 2) needs recovery
+            if (doc.isRecoveryNeeded(now)) {
+                if (mId.equals(machineId) && iId.equals(instanceId)) {
+                    // this id matches our environment and has an expired lease
+                    // use it after a successful recovery
+                    if (!recoveryHandler.recover(id)) {
+                        reuseFailureReason = reject(id,
+                                "needs recovery and was unable to perform it myself");
+                        continue;
+                    }
+                } else {
+                    // a different machine or instance
+                    reuseFailureReason = reject(id,
+                            "needs recovery and machineId/instanceId do not match: " +
+                                    mId + "/" + iId + " != " + machineId + "/" + instanceId);
+                    continue;
                 }
-                continue;
             }
 
-            if (!mId.equals(machineId) || !iId.equals(instanceId)) {
-                // a different machine or instance
-                reuseFailureReason = "machineId/instanceId do not match: " + mId + "/" + iId + " != " + machineId + "/" + instanceId;
-                continue;
-            }
+            // if we get here the cluster node entry is inactive. if recovery
+            // was needed, then it was successful
 
-            // a cluster node which matches current machine identity but
-            // not being used
-            if (clusterNodeId == 0 || id < clusterNodeId) {
-                // if there are multiple, use the smallest value
-                clusterNodeId = id;
-                state = ClusterNodeState.fromString((String) doc.get(STATE));
-                prevLeaseEnd = leaseEnd;
-                lockState = RecoverLockState.fromString((String) doc.get(REV_RECOVERY_LOCK));
-            }
+            // create a candidate. those with matching machine and instance id
+            // are preferred, then the one with the lowest clusterId.
+            candidates.add(new ClusterNodeInfo(id, store, mId, iId, false));
+            startTimes.put(id, doc.getStartTime());
         }
 
-        // No usable existing entry with matching signature found so
-        // create a new entry
-        if (clusterNodeId == 0) {
-            newEntry = true;
+        if (candidates.isEmpty()) {
+            // No usable existing entry found
+            int clusterNodeId;
             if (configuredClusterId != 0) {
                 if (alreadyExistingConfigured != null) {
                     throw new DocumentStoreException(
@@ -581,12 +587,41 @@ public class ClusterNodeInfo {
             } else {
                 clusterNodeId = maxId + 1;
             }
+            // No usable existing entry found so create a new entry
+            candidates.add(new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, true));
         }
 
-        // Do not expire entries and stick on the earlier state, and leaseEnd so,
-        // that _lastRev recovery if needed is done.
-        return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state,
-                lockState, prevLeaseEnd, newEntry);
+        // use the best candidate
+        ClusterNodeInfo info = candidates.first();
+        // and replace with an info matching the current machine and instance id
+        info = new ClusterNodeInfo(info.id, store, machineId, instanceId, info.newEntry);
+        return new AbstractMap.SimpleImmutableEntry<>(info, startTimes.get(info.getId()));
+    }
+
+    private static void logClusterIdAcquired(ClusterNodeInfo clusterNode,
+                                             ClusterNodeInfoDocument before) {
+        String type = clusterNode.newEntry ? "new" : "existing";
+        String machineInfo = clusterNode.machineId;
+        String instanceInfo = clusterNode.instanceId;
+        if (before != null) {
+            // machineId or instanceId may have changed
+            String beforeMachineId = String.valueOf(before.get(MACHINE_ID_KEY));
+            String beforeInstanceId = String.valueOf(before.get(INSTANCE_ID_KEY));
+            if (!clusterNode.machineId.equals(beforeMachineId)) {
+                machineInfo = "(changed) " + beforeMachineId + " -> " + machineInfo;
+            }
+            if (!clusterNode.instanceId.equals(beforeInstanceId)) {
+                instanceInfo = "(changed) " + beforeInstanceId + " -> " + instanceInfo;
+            }
+        }
+        LOG.info("Acquired ({}) clusterId {}. MachineId {}, InstanceId {}",
+                type, clusterNode.getId(), machineInfo, instanceInfo);
+
+    }
+
+    private static String reject(int id, String reason) {
+        LOG.debug("Cannot acquire {}: {}", id, reason);
+        return reason;
     }
 
     private static boolean waitForLeaseExpiry(DocumentStore store, ClusterNodeInfoDocument cdoc, long leaseEnd, String machineId,
@@ -645,7 +680,7 @@ public class ClusterNodeInfo {
      * @throws DocumentStoreException if the lease expired.
      */
     public void performLeaseCheck() throws DocumentStoreException {
-        if (leaseCheckDisabled || !renewed) {
+        if (leaseCheckDisabled) {
             // if leaseCheckDisabled is set we never do the check, so return fast
 
             // the 'renewed' flag indicates if this instance *ever* renewed the lease after startup
@@ -818,9 +853,8 @@ public class ClusterNodeInfo {
 
         UpdateOp update = new UpdateOp("" + id, false);
         update.set(LEASE_END_KEY, updatedLeaseEndTime);
-        update.set(STATE, ACTIVE.name());
 
-        if (renewed && !leaseCheckDisabled) {
+        if (!leaseCheckDisabled) {
             // if leaseCheckDisabled, then we just update the lease without
             // checking
             // OAK-3398:
@@ -829,12 +863,12 @@ public class ClusterNodeInfo {
             // then we can now make an assertion that the lease is unchanged
             // and the incremental update must only succeed if no-one else
             // did a recover/inactivation in the meantime
-            // make two assertions: the leaseEnd must match ..
+            // make three assertions: the leaseEnd must match ..
             update.equals(LEASE_END_KEY, null, previousLeaseEndTime);
             // plus it must still be active ..
             update.equals(STATE, null, ACTIVE.name());
             // plus it must not have a recovery lock on it
-            update.notEquals(REV_RECOVERY_LOCK, RecoverLockState.ACQUIRED.name());
+            update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
             // @TODO: to make it 100% failure proof we could introduce
             // yet another field to clusterNodes: a runtimeId that we
             // create (UUID) at startup each time - and against that
@@ -900,7 +934,6 @@ public class ClusterNodeInfo {
                 readWriteMode = mode;
                 store.setReadWriteMode(mode);
             }
-            renewed = true;
             return true;
         } catch (DocumentStoreException e) {
             dse = e;
@@ -1014,8 +1047,8 @@ public class ClusterNodeInfo {
         UpdateOp update = new UpdateOp("" + id, true);
         update.set(LEASE_END_KEY, null);
         update.set(STATE, null);
-        update.set(REV_RECOVERY_LOCK, RecoverLockState.NONE.name());
         store.createOrUpdate(Collection.CLUSTER_NODES, update);
+        state = NONE;
     }
 
     @Override
@@ -1028,7 +1061,6 @@ public class ClusterNodeInfo {
                 "uuid: " + uuid + ",\n" +
                 "readWriteMode: " + readWriteMode + ",\n" +
                 "state: " + state + ",\n" +
-                "revLock: " + revRecoveryLock + ",\n" +
                 "oakVersion: " + OAK_VERSION + ",\n" +
                 "formatVersion: " + DocumentNodeStore.VERSION;
     }
@@ -1080,7 +1112,7 @@ public class ClusterNodeInfo {
      *
      * @return the unique id
      */
-    private static String getMachineId() {
+    private static String getHardwareMachineId() {
         Exception exception = null;
         try {
             ArrayList<String> macAddresses = new ArrayList<String>();

Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java?rev=1834986&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java Tue Jul  3 15:25:38 2018
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Comparator;
+
+/**
+ * Comparator for {@link ClusterNodeInfo} instances based on a given environment
+ * defined by a {@code machineId} and {@code instanceId}. The comparator orders
+ * cluster node info with a matching environment before others that do not match
+ * and then compares the clusterId, lower values first.
+ */
+class ClusterNodeInfoComparator implements Comparator<ClusterNodeInfo> {
+
+    private static final Comparator<Boolean> BOOLEAN_REVERSED = Comparator.comparing(Boolean::booleanValue).reversed();
+
+    private final String machineId;
+    private final String instanceId;
+
+    ClusterNodeInfoComparator(String machineId,
+                              String instanceId) {
+        this.machineId = machineId;
+        this.instanceId = instanceId;
+    }
+
+    @Override
+    public int compare(ClusterNodeInfo info1,
+                       ClusterNodeInfo info2) {
+        // first compare whether the environment matches
+        return Comparator.comparing(this::matchesEnvironment, BOOLEAN_REVERSED)
+                // then compare the clusterIds
+                .thenComparingInt(ClusterNodeInfo::getId).compare(info1, info2);
+    }
+
+    private boolean matchesEnvironment(ClusterNodeInfo info) {
+        return machineId.equals(info.getMachineId())
+                && instanceId.equals(info.getInstanceId());
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java Tue Jul  3 15:25:38 2018
@@ -82,6 +82,23 @@ public class ClusterNodeInfoDocument ext
     }
 
     /**
+     * Check if _lastRev recovery is needed for cluster node info document.
+     * Returns {@code true} if both of the below conditions are {@code true}:
+     * <ul>
+     *     <li>State is Active</li>
+     *     <li>Current time is past the leaseEnd time or there is a recovery
+     *          lock on the cluster node info document</li>
+     * </ul>
+     * @param currentTimeMillis the current time in milliseconds since the start
+     *                          start of the epoch.
+     */
+    public boolean isRecoveryNeeded(long currentTimeMillis) {
+        return isActive() &&
+                (currentTimeMillis > getLeaseEndTime() ||
+                        isBeingRecovered());
+    }
+
+    /**
      * Returns {@code true} if the cluster node represented by this document
      * is currently being recovered by the given {@code clusterId}.
      *

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Jul  3 15:25:38 2018
@@ -29,7 +29,6 @@ import static java.util.concurrent.TimeU
 import static org.apache.jackrabbit.oak.api.CommitFailedException.OAK;
 import static org.apache.jackrabbit.oak.commons.PathUtils.ROOT_PATH;
 import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
-import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 import static org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder.MANY_CHILDREN_THRESHOLD;
@@ -177,14 +176,6 @@ public final class DocumentNodeStore
     private boolean fairBackgroundOperationLock =
             Boolean.parseBoolean(System.getProperty("oak.fairBackgroundOperationLock", "true"));
 
-    /**
-     * The timeout in milliseconds to wait for the recovery performed by
-     * another cluster node.
-     */
-    private long recoveryWaitTimeoutMS =
-            Long.getLong("oak.recoveryWaitTimeoutMS", 60000);
-
-
     public static final String SYS_PROP_DISABLE_JOURNAL = "oak.disableJournalDiff";
     /**
      * Feature flag to disable the journal diff mechanism. See OAK-4528.
@@ -537,22 +528,24 @@ public final class DocumentNodeStore
             readOnlyMode = false;
         }
         checkVersion(s, readOnlyMode);
+        this.nonLeaseCheckingStore = s;
         this.executor = builder.getExecutor();
+        this.lastRevSeeker = builder.createMissingLastRevSeeker();
         this.clock = builder.getClock();
 
         int cid = builder.getClusterId();
         cid = Integer.getInteger("oak.documentMK.clusterId", cid);
         if (readOnlyMode) {
-            clusterNodeInfo = ClusterNodeInfo.getReadOnlyInstance(s);
+            clusterNodeInfo = ClusterNodeInfo.getReadOnlyInstance(nonLeaseCheckingStore);
         } else {
-            clusterNodeInfo = ClusterNodeInfo.getInstance(s, cid);
+            clusterNodeInfo = ClusterNodeInfo.getInstance(nonLeaseCheckingStore,
+                    new RecoveryHandlerImpl(nonLeaseCheckingStore, clock, lastRevSeeker),
+                    null, null, cid);
         }
         // TODO we should ensure revisions generated from now on
         // are never "older" than revisions already in the repository for
         // this cluster id
-        cid = clusterNodeInfo.getId();
-
-        this.nonLeaseCheckingStore = s;
+        this.clusterId = clusterNodeInfo.getId();
 
         if (builder.getLeaseCheck()) {
             s = new LeaseCheckDocumentStoreWrapper(s, clusterNodeInfo);
@@ -560,11 +553,21 @@ public final class DocumentNodeStore
         } else {
             clusterNodeInfo.setLeaseCheckDisabled(true);
         }
+        String threadNamePostfix = "(" + clusterId + ")";
+        leaseUpdateThread = new Thread(new BackgroundLeaseUpdate(this, isDisposed),
+                "DocumentNodeStore lease update thread " + threadNamePostfix);
+        leaseUpdateThread.setDaemon(true);
+        if (!readOnlyMode) {
+            // OAK-3398 : make lease updating more robust by ensuring it
+            // has higher likelihood of succeeding than other threads
+            // on a very busy machine - so as to prevent lease timeout.
+            leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
+            leaseUpdateThread.start();
+        }
 
         this.journalPropertyHandlerFactory = builder.getJournalPropertyHandlerFactory();
         this.store = s;
         this.changes = newJournalEntry();
-        this.clusterId = cid;
         this.branches = new UnmergedBranches();
         this.asyncDelay = builder.getAsyncDelay();
         this.versionGarbageCollector = new VersionGarbageCollector(
@@ -575,8 +578,8 @@ public final class DocumentNodeStore
                 this, builder.getJournalGCMaxAge());
         this.referencedBlobs =
                 builder.createReferencedBlobs(this);
-        this.lastRevSeeker = builder.createMissingLastRevSeeker();
-        this.lastRevRecoveryAgent = new LastRevRecoveryAgent(this, lastRevSeeker);
+        this.lastRevRecoveryAgent = new LastRevRecoveryAgent(store, this,
+                lastRevSeeker, clusterId -> this.signalClusterStateChange());
         this.disableBranches = builder.isDisableBranches();
         this.missing = new DocumentNodeState(this, "MISSING",
                 new RevisionVector(new Revision(0, 0, 0))) {
@@ -597,7 +600,6 @@ public final class DocumentNodeStore
                 builder.getWeigher(), builder.getChildrenCacheSize());
 
         diffCache = builder.getDiffCache(this.clusterId);
-        checkpoints = new Checkpoints(this);
 
         // check if root node exists
         NodeDocument rootDoc = store.find(NODES, Utils.getIdFromPath("/"));
@@ -625,9 +627,6 @@ public final class DocumentNodeStore
             }
         } else {
             sweepRevisions = sweepRevisions.pmax(rootDoc.getSweepRevisions());
-            if (!readOnlyMode) {
-                checkLastRevRecovery();
-            }
             initializeRootState(rootDoc);
             // check if _lastRev for our clusterId exists
             if (!rootDoc.getLastRev().containsKey(clusterId)) {
@@ -647,9 +646,7 @@ public final class DocumentNodeStore
             }
         }
 
-        // Renew the lease because it may have been stale
-        renewClusterIdLease();
-
+        checkpoints = new Checkpoints(this);
         // initialize branchCommits
         branches.init(store, this);
 
@@ -657,7 +654,6 @@ public final class DocumentNodeStore
                 new PrefetchDispatcher(getRoot(), executor) :
                 new ChangeDispatcher(getRoot());
         commitQueue = new CommitQueue(this);
-        String threadNamePostfix = "(" + clusterId + ")";
         batchCommitQueue = new BatchCommitQueue(store);
         // prepare background threads
         backgroundReadThread = new Thread(
@@ -675,19 +671,10 @@ public final class DocumentNodeStore
         clusterUpdateThread = new Thread(new BackgroundClusterUpdate(this, isDisposed),
                 "DocumentNodeStore cluster update thread " + threadNamePostfix);
         clusterUpdateThread.setDaemon(true);
-        leaseUpdateThread = new Thread(new BackgroundLeaseUpdate(this, isDisposed),
-                "DocumentNodeStore lease update thread " + threadNamePostfix);
-        leaseUpdateThread.setDaemon(true);
         // now start the background threads
         clusterUpdateThread.start();
         backgroundReadThread.start();
         if (!readOnlyMode) {
-            // OAK-3398 : make lease updating more robust by ensuring it
-            // has higher likelihood of succeeding than other threads
-            // on a very busy machine - so as to prevent lease timeout.
-            leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
-            leaseUpdateThread.start();
-
             // perform an initial document sweep if needed
             // this may be long running if there is no sweep revision
             // for this clusterId (upgrade from Oak <= 1.6).
@@ -716,31 +703,6 @@ public final class DocumentNodeStore
         }
     }
 
-
-    /**
-     * Recover _lastRev recovery if needed.
-     *
-     * @throws DocumentStoreException if recovery did not finish within
-     *          {@link #recoveryWaitTimeoutMS}.
-     */
-    private void checkLastRevRecovery() throws DocumentStoreException {
-        long timeout = clock.getTime() + recoveryWaitTimeoutMS;
-        int numRecovered = lastRevRecoveryAgent.recover(clusterId, timeout);
-        if (numRecovered == -1) {
-            ClusterNodeInfoDocument doc = store.find(CLUSTER_NODES, String.valueOf(clusterId));
-            String otherId = "n/a";
-            if (doc != null) {
-                otherId = String.valueOf(doc.get(ClusterNodeInfo.REV_RECOVERY_BY));
-            }
-            String msg = "This cluster node (" + clusterId + ") requires " +
-                    "_lastRev recovery which is currently performed by " +
-                    "another cluster node (" + otherId + "). Recovery is " +
-                    "still ongoing after " + recoveryWaitTimeoutMS + " ms. " +
-                    "Failing startup of this DocumentNodeStore now!";
-            throw new DocumentStoreException(msg);
-        }
-    }
-
     public void dispose() {
         LOG.info("Starting disposal of DocumentNodeStore with clusterNodeId: {} ({})", clusterId,
                 getClusterNodeInfoDisplayString());
@@ -2985,7 +2947,7 @@ public final class DocumentNodeStore
         this.clusterStateChangeListener = clusterStateChangeListener;
     }
 
-    void signalClusterStateChange() {
+    private void signalClusterStateChange() {
         if (clusterStateChangeListener != null) {
             clusterStateChangeListener.handleClusterStateChange();
         }

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Tue Jul  3 15:25:38 2018
@@ -33,10 +33,10 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 
 import javax.annotation.CheckForNull;
 
-import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
@@ -65,19 +65,28 @@ import org.slf4j.LoggerFactory;
 public class LastRevRecoveryAgent {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private final DocumentNodeStore nodeStore;
+    private final DocumentStore store;
+
+    private final RevisionContext revisionContext;
 
     private final MissingLastRevSeeker missingLastRevUtil;
 
-    public LastRevRecoveryAgent(DocumentNodeStore nodeStore,
-                                MissingLastRevSeeker seeker) {
-        this.nodeStore = nodeStore;
+    private final Consumer<Integer> afterRecovery;
+
+    public LastRevRecoveryAgent(DocumentStore store,
+                                RevisionContext revisionContext,
+                                MissingLastRevSeeker seeker,
+                                Consumer<Integer> afterRecovery) {
+        this.store = store;
+        this.revisionContext = revisionContext;
         this.missingLastRevUtil = seeker;
+        this.afterRecovery = afterRecovery;
     }
 
-    public LastRevRecoveryAgent(DocumentNodeStore nodeStore) {
-        this(nodeStore, new MissingLastRevSeeker(
-                nodeStore.getDocumentStore(), nodeStore.getClock()));
+    public LastRevRecoveryAgent(DocumentStore store, RevisionContext context) {
+        this(store, context,
+                new MissingLastRevSeeker(store, context.getClock()),
+                i -> {});
     }
 
     /**
@@ -85,7 +94,15 @@ public class LastRevRecoveryAgent {
      * nodes. If another cluster node is already performing the recovery for the
      * given {@code clusterId}, this method will {@code waitUntil} the given
      * time in milliseconds for the recovery to finish.
-     *
+     * <p>
+     * If recovery is performed for the clusterId as exposed by the revision
+     * context passed to the constructor of this recovery agent, then this
+     * method will put a deadline on how long recovery may take. The deadline
+     * is the current lease end as read from the {@code clusterNodes} collection
+     * entry for the {@code clusterId} to recover minus the
+     * {@link ClusterNodeInfo#DEFAULT_LEASE_FAILURE_MARGIN_MILLIS}. This method
+     * will throw a {@link DocumentStoreException} if the deadline is reached.
+     * <p>
      * This method will return:
      * <ul>
      *     <li>{@code -1} when another cluster node is busy performing recovery
@@ -104,22 +121,17 @@ public class LastRevRecoveryAgent {
      *                  already performing the recovery.
      * @return the number of restored nodes or {@code -1} if a timeout occurred
      *          while waiting for an ongoing recovery by another cluster node.
+     * @throws DocumentStoreException if the deadline is reached or some other
+     *          error occurs while reading from the underlying document store.
      */
     public int recover(int clusterId, long waitUntil)
             throws DocumentStoreException {
         ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
 
-        //TODO Currently leaseTime remains same per cluster node. If this
-        //is made configurable then it should be read from DB entry
-        final long leaseTime = ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS;
-        final long asyncDelay = nodeStore.getAsyncDelay();
-
         if (nodeInfo != null) {
             // Check if _lastRev recovery needed for this cluster node
             // state is Active && current time past leaseEnd
-            if (missingLastRevUtil.isRecoveryNeeded(nodeInfo)) {
-                long leaseEnd = nodeInfo.getLeaseEndTime();
-
+            if (nodeInfo.isRecoveryNeeded(revisionContext.getClock().getTime())) {
                 // retrieve the root document's _lastRev
                 NodeDocument root = missingLastRevUtil.getRoot();
                 Revision lastRev = root.getLastRev().get(clusterId);
@@ -134,10 +146,9 @@ public class LastRevRecoveryAgent {
                     startTime = lastRev.getTimestamp();
                     reason = "lastRev: " + lastRev.toString();
                 } else {
-                    startTime = leaseEnd - leaseTime - asyncDelay;
+                    startTime = nodeInfo.getStartTime();
                     reason = String.format(
-                            "no lastRev for root, using timestamp based on leaseEnd %d - leaseTime %d - asyncDelay %d", leaseEnd,
-                            leaseTime, asyncDelay);
+                            "no lastRev for root, using startTime %d", startTime);
                 }
                 if (sweepRev != null && sweepRev.getTimestamp() < startTime) {
                     startTime = sweepRev.getTimestamp();
@@ -159,24 +170,37 @@ public class LastRevRecoveryAgent {
      * @param clusterId the cluster id for which the _lastRev are to be recovered
      * @return the number of restored nodes or {@code -1} if there is an ongoing
      *          recovery by another cluster node.
+     * @throws DocumentStoreException if the deadline is reached or some other
+     *          error occurs while reading from the underlying document store.
      */
-    public int recover(int clusterId) {
+    public int recover(int clusterId) throws DocumentStoreException {
         return recover(clusterId, 0);
     }
 
     /**
-     * Recover the correct _lastRev updates for the given candidate nodes.
+     * Same as {@link #recover(Iterable, int, boolean)} with {@code dryRun} set
+     * to {@code false}.
      *
      * @param suspects the potential suspects
      * @param clusterId the cluster id for which _lastRev recovery needed
      * @return the number of documents that required recovery.
+     * @throws DocumentStoreException if the deadline is reached or some other
+     *          error occurs while reading from the underlying document store.
      */
-    public int recover(Iterable<NodeDocument> suspects, int clusterId) {
+    public int recover(Iterable<NodeDocument> suspects, int clusterId)
+            throws DocumentStoreException {
         return recover(suspects, clusterId, false);
     }
 
     /**
-     * Recover the correct _lastRev updates for the given candidate nodes.
+     * Recover the correct _lastRev updates for the given candidate nodes. If
+     * recovery is performed for the clusterId as exposed by the revision
+     * context passed to the constructor of this recovery agent, then this
+     * method will put a deadline on how long recovery may take. The deadline
+     * is the current lease end as read from the {@code clusterNodes} collection
+     * entry for the {@code clusterId} to recover minus the
+     * {@link ClusterNodeInfo#DEFAULT_LEASE_FAILURE_MARGIN_MILLIS}. This method
+     * will throw a {@link DocumentStoreException} if the deadline is reached.
      *
      * @param suspects the potential suspects
      * @param clusterId the cluster id for which _lastRev recovery needed
@@ -185,12 +209,24 @@ public class LastRevRecoveryAgent {
      * @return the number of documents that required recovery. This method
      *          returns the number of the affected documents even if
      *          {@code dryRun} is set true and no document was changed.
+     * @throws DocumentStoreException if the deadline is reached or some other
+     *          error occurs while reading from the underlying document store.
      */
     public int recover(final Iterable<NodeDocument> suspects,
                        final int clusterId, final boolean dryRun)
             throws DocumentStoreException {
-        final DocumentStore docStore = nodeStore.getDocumentStore();
-        NodeDocument rootDoc = Utils.getRootDocument(docStore);
+        // set a deadline if this is a self recovery. Self recovery does not
+        // update the lease in a background thread and must terminate before
+        // the lease acquired by the recovery lock expires.
+        long deadline = Long.MAX_VALUE;
+        if (clusterId == revisionContext.getClusterId()) {
+            ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
+            if (nodeInfo != null && nodeInfo.isActive()) {
+                deadline = nodeInfo.getLeaseEndTime() - ClusterNodeInfo.DEFAULT_LEASE_FAILURE_MARGIN_MILLIS;
+            }
+        }
+
+        NodeDocument rootDoc = Utils.getRootDocument(store);
 
         // first run a sweep
         final AtomicReference<Revision> sweepRev = new AtomicReference<>();
@@ -199,8 +235,8 @@ public class LastRevRecoveryAgent {
             // sweep revision. Initial sweep is not the responsibility
             // of the recovery agent.
             final RevisionContext context = new RecoveryContext(rootDoc,
-                    nodeStore.getClock(), clusterId,
-                    nodeStore::getCommitValue);
+                    revisionContext.getClock(), clusterId,
+                    revisionContext::getCommitValue);
             final NodeDocumentSweeper sweeper = new NodeDocumentSweeper(context, true);
             sweeper.sweep(suspects, new NodeDocumentSweepListener() {
                 @Override
@@ -213,16 +249,16 @@ public class LastRevRecoveryAgent {
                         return;
                     }
                     // create an invalidate entry
-                    JournalEntry inv = JOURNAL.newDocument(docStore);
+                    JournalEntry inv = JOURNAL.newDocument(store);
                     inv.modified(updates.keySet());
                     Revision r = context.newRevision().asBranchRevision();
                     UpdateOp invOp = inv.asUpdateOp(r);
                     // and reference it from a regular entry
-                    JournalEntry entry = JOURNAL.newDocument(docStore);
+                    JournalEntry entry = JOURNAL.newDocument(store);
                     entry.invalidate(Collections.singleton(r));
                     Revision jRev = context.newRevision();
                     UpdateOp jOp = entry.asUpdateOp(jRev);
-                    if (!docStore.create(JOURNAL, newArrayList(invOp, jOp))) {
+                    if (!store.create(JOURNAL, newArrayList(invOp, jOp))) {
                         String msg = "Unable to create journal entries for " +
                                 "document invalidation.";
                         throw new DocumentStoreException(msg);
@@ -230,7 +266,7 @@ public class LastRevRecoveryAgent {
                     sweepRev.set(Utils.max(sweepRev.get(), jRev));
                     // now that journal entry is in place, perform the actual
                     // updates on the documents
-                    docStore.createOrUpdate(NODES, newArrayList(updates.values()));
+                    store.createOrUpdate(NODES, newArrayList(updates.values()));
                     log.info("Sweeper updated {}", updates.keySet());
                 }
             });
@@ -242,7 +278,7 @@ public class LastRevRecoveryAgent {
 
         //Map of known last rev of checked paths
         Map<String, Revision> knownLastRevOrModification = MapFactory.getInstance().create();
-        final JournalEntry changes = JOURNAL.newDocument(docStore);
+        final JournalEntry changes = JOURNAL.newDocument(store);
 
         long count = 0;
         for (NodeDocument doc : suspects) {
@@ -292,7 +328,7 @@ public class LastRevRecoveryAgent {
                 // we don't know when the document was last modified with
                 // the given clusterId. need to read from store
                 String id = Utils.getIdFromPath(parentPath);
-                NodeDocument doc = docStore.find(NODES, id);
+                NodeDocument doc = store.find(NODES, id);
                 if (doc != null) {
                     Revision lastRev = doc.getLastRev().get(clusterId);
                     Revision lastMod = determineLastModification(doc, clusterId);
@@ -328,6 +364,15 @@ public class LastRevRecoveryAgent {
             log.info("Dry run of lastRev recovery identified [{}] documents for " +
                     "cluster node [{}]: {}", size, clusterId, updates);
         } else {
+            // check deadline before the update
+            if (revisionContext.getClock().getTime() > deadline) {
+                String msg = String.format("Cluster node %d was unable to " +
+                        "perform lastRev recovery for clusterId %d within " +
+                        "deadline: %s", clusterId, clusterId,
+                        Utils.timestampToString(deadline));
+                throw new DocumentStoreException(msg);
+            }
+
             //UnsavedModifications is designed to be used in concurrent
             //access mode. For recovery case there is no concurrent access
             //involve so just pass a new lock instance
@@ -336,7 +381,7 @@ public class LastRevRecoveryAgent {
             // thus it doesn't matter, where exactly the check is done
             // as to whether the recovered lastRev has already been
             // written to the journal.
-            unsaved.persist(docStore, new Supplier<Revision>() {
+            unsaved.persist(store, new Supplier<Revision>() {
                 @Override
                 public Revision get() {
                     return sweepRev.get();
@@ -357,7 +402,7 @@ public class LastRevRecoveryAgent {
                     }
 
                     final String id = JournalEntry.asId(lastRootRev); // lastRootRev never null at this point
-                    final JournalEntry existingEntry = docStore.find(Collection.JOURNAL, id);
+                    final JournalEntry existingEntry = store.find(Collection.JOURNAL, id);
                     if (existingEntry != null) {
                         // then the journal entry was already written - as can happen if
                         // someone else (or the original instance itself) wrote the
@@ -368,7 +413,7 @@ public class LastRevRecoveryAgent {
                     }
 
                     // otherwise store a new journal entry now
-                    docStore.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev)));
+                    store.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev)));
                 }
             }, new ReentrantLock());
 
@@ -379,9 +424,19 @@ public class LastRevRecoveryAgent {
         return size;
     }
 
+    //--------------------------< internal >------------------------------------
+
     /**
      * Retrieves possible candidates which have been modified after the given
      * {@code startTime} and recovers the missing updates.
+     * <p>
+     * If recovery is performed for the clusterId as exposed by the revision
+     * context passed to the constructor of this recovery agent, then this
+     * method will put a deadline on how long recovery may take. The deadline
+     * is the current lease end as read from the {@code clusterNodes} collection
+     * entry for the {@code clusterId} to recover minus the
+     * {@link ClusterNodeInfo#DEFAULT_LEASE_FAILURE_MARGIN_MILLIS}. This method
+     * will throw a {@link DocumentStoreException} if the deadline is reached.
      *
      * @param nodeInfo the info of the cluster node to recover.
      * @param startTime the start time
@@ -391,20 +446,23 @@ public class LastRevRecoveryAgent {
      * @return the number of restored nodes or {@code -1} if recovery is still
      *      ongoing by another process even when {@code waitUntil} time was
      *      reached.
+     * @throws DocumentStoreException if the deadline is reached or some other
+     *          error occurs while reading from the underlying document store.
      */
     private int recoverCandidates(final ClusterNodeInfoDocument nodeInfo,
                                   final long startTime,
                                   final long waitUntil,
-                                  final String info) {
+                                  final String info)
+            throws DocumentStoreException {
         ClusterNodeInfoDocument infoDoc = nodeInfo;
         int clusterId = infoDoc.getClusterId();
         for (;;) {
             if (missingLastRevUtil.acquireRecoveryLock(
-                    clusterId, nodeStore.getClusterId())) {
+                    clusterId, revisionContext.getClusterId())) {
                 break;
             }
 
-            Clock clock = nodeStore.getClock();
+            Clock clock = revisionContext.getClock();
             long remaining = waitUntil - clock.getTime();
             if (remaining < 0) {
                 // no need to wait for lock release, waitUntil already reached
@@ -426,7 +484,12 @@ public class LastRevRecoveryAgent {
                 throw new DocumentStoreException(msg, e);
             }
             infoDoc = missingLastRevUtil.getClusterNodeInfo(clusterId);
-            if (!missingLastRevUtil.isRecoveryNeeded(infoDoc)) {
+            if (infoDoc == null) {
+                String msg = String.format("No cluster node info document " +
+                        "for id %d", clusterId);
+                throw new DocumentStoreException(msg);
+            }
+            if (!infoDoc.isRecoveryNeeded(clock.getTime())) {
                 // meanwhile another process finished recovery
                 return 0;
             }
@@ -449,8 +512,7 @@ public class LastRevRecoveryAgent {
             }
         } finally {
             missingLastRevUtil.releaseRecoveryLock(clusterId, success);
-
-            nodeStore.signalClusterStateChange();
+            afterRecovery.accept(clusterId);
         }
     }
 
@@ -474,7 +536,7 @@ public class LastRevRecoveryAgent {
             // collect committed changes of this cluster node
             for (Map.Entry<Revision, String> entry : filterKeys(valueMap, cp).entrySet()) {
                 Revision rev = entry.getKey();
-                String cv = nodeStore.getCommitValue(rev, doc);
+                String cv = revisionContext.getCommitValue(rev, doc);
                 if (isCommitted(cv)) {
                     lastModified = Utils.max(lastModified, resolveCommitRevision(rev, cv));
                     break;
@@ -501,7 +563,7 @@ public class LastRevRecoveryAgent {
         if (isRecoveryNeeded()) {
             Iterable<Integer> clusterIds = getRecoveryCandidateNodes();
             log.info("ClusterNodeId [{}] starting Last Revision Recovery for clusterNodeId(s) {}",
-                    nodeStore.getClusterId(), clusterIds);
+                    revisionContext.getClusterId(), clusterIds);
             for (int clusterId : clusterIds) {
                 if (recover(clusterId) == -1) {
                     log.info("Last Revision Recovery for cluster node {} " +
@@ -524,14 +586,10 @@ public class LastRevRecoveryAgent {
                 new Predicate<ClusterNodeInfoDocument>() {
             @Override
             public boolean apply(ClusterNodeInfoDocument input) {
-                return nodeStore.getClusterId() != input.getClusterId() && missingLastRevUtil.isRecoveryNeeded(input);
-            }
-        }), new Function<ClusterNodeInfoDocument, Integer>() {
-            @Override
-            public Integer apply(ClusterNodeInfoDocument input) {
-                return input.getClusterId();
+                return revisionContext.getClusterId() != input.getClusterId()
+                        && input.isRecoveryNeeded(revisionContext.getClock().getTime());
             }
-        });
+        }), ClusterNodeInfoDocument::getClusterId);
     }
 
     private static class ClusterPredicate implements Predicate<Revision> {

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java Tue Jul  3 15:25:38 2018
@@ -19,24 +19,17 @@
 
 package org.apache.jackrabbit.oak.plugins.document;
 
+import java.util.stream.StreamSupport;
+
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
-import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.apache.jackrabbit.oak.stats.Clock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.ACTIVE;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.LEASE_END_KEY;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_BY;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_LOCK;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState.ACQUIRED;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.STATE;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.getModifiedInSecs;
@@ -47,22 +40,12 @@ import static org.apache.jackrabbit.oak.
  */
 public class MissingLastRevSeeker {
 
-    private static final Logger LOG = LoggerFactory.getLogger(MissingLastRevSeeker.class);
-
     protected final String ROOT_PATH = "/";
 
     private final DocumentStore store;
 
     protected final Clock clock;
 
-    private final Predicate<ClusterNodeInfoDocument> isRecoveryNeeded =
-            new Predicate<ClusterNodeInfoDocument>() {
-        @Override
-        public boolean apply(ClusterNodeInfoDocument nodeInfo) {
-            return isRecoveryNeeded(nodeInfo);
-        }
-    };
-
     public MissingLastRevSeeker(DocumentStore store, Clock clock) {
         this.store = store;
         this.clock = clock;
@@ -123,21 +106,8 @@ public class MissingLastRevSeeker {
      * @return whether the lock has been acquired
      */
     public boolean acquireRecoveryLock(int clusterId, int recoveredBy) {
-        ClusterNodeInfoDocument doc = getClusterNodeInfo(clusterId);
-        if (doc == null) {
-            // this is unexpected...
-            return false;
-        }
-        if (!isRecoveryNeeded(doc)) {
-            return false;
-        }
-        boolean acquired = tryAcquireRecoveryLock(doc, recoveredBy);
-        if (acquired) {
-            return true;
-        }
-        // either we already own the lock or were able to break the lock
-        return doc.isBeingRecoveredBy(recoveredBy)
-                || tryBreakRecoveryLock(doc, recoveredBy);
+        return new RecoveryLock(store, clock, clusterId)
+                .acquireRecoveryLock(recoveredBy);
     }
 
     /**
@@ -152,117 +122,34 @@ public class MissingLastRevSeeker {
      * @param success whether recovery was successful.
      */
     public void releaseRecoveryLock(int clusterId, boolean success) {
-        try {
-            UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
-            update.set(REV_RECOVERY_LOCK, RecoverLockState.NONE.name());
-            update.set(REV_RECOVERY_BY, null);
-            if (success) {
-                update.set(STATE, null);
-            }
-            ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
-            if (old == null) {
-                throw new RuntimeException("ClusterNodeInfo document for " + clusterId + " missing.");
-            }
-            LOG.info("Released recovery lock for cluster id {} (recovery successful: {})",
-                    clusterId, success);
-        } catch (RuntimeException ex) {
-            LOG.error("Failed to release the recovery lock for clusterNodeId " + clusterId, ex);
-            throw (ex);
-        }
+        new RecoveryLock(store, clock, clusterId).releaseRecoveryLock(success);
     }
 
     public NodeDocument getRoot() {
         return store.find(Collection.NODES, Utils.getIdFromPath(ROOT_PATH));
     }
 
-    public boolean isRecoveryNeeded() {
-        return Iterables.any(getAllClusters(), isRecoveryNeeded);
-    }
-
     /**
-     * Check if _lastRev recovery needed for this cluster node
-     * state is Active and currentTime past the leaseEnd time
+     * Returns {@code true} if any of the cluster node info documents satisfies
+     * {@link ClusterNodeInfoDocument#isRecoveryNeeded(long)} where the passed
+     * timestamp is the current time.
+     *
+     * @return {@code true} if any of the cluster nodes need recovery,
+     *          {@code false} otherwise.
      */
-    public boolean isRecoveryNeeded(@Nonnull ClusterNodeInfoDocument nodeInfo) {
-        return nodeInfo.isActive() && clock.getTime() > nodeInfo.getLeaseEndTime();
+    public boolean isRecoveryNeeded() {
+        long now = clock.getTime();
+        return StreamSupport.stream(getAllClusters().spliterator(), false)
+                .anyMatch(info -> info != null && info.isRecoveryNeeded(now));
     }
 
-    //-------------------------< internal >-------------------------------------
-
     /**
-     * Acquire a recovery lock for the given cluster node info document
+     * Same as {@link ClusterNodeInfoDocument#isRecoveryNeeded(long)}.
      *
-     * @param info
-     *            info document of the cluster that is going to be recovered
-     * @param recoveredBy
-     *            id of cluster doing the recovery ({@code 0} when unknown)
-     * @return whether the lock has been acquired
+     * @deprecated use {@link ClusterNodeInfoDocument#isRecoveryNeeded(long)}
+     *          instead.
      */
-    private boolean tryAcquireRecoveryLock(ClusterNodeInfoDocument info,
-                                           int recoveredBy) {
-        int clusterId = info.getClusterId();
-        try {
-            UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
-            update.equals(STATE, ACTIVE.name());
-            update.equals(LEASE_END_KEY, info.getLeaseEndTime());
-            update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
-            update.set(REV_RECOVERY_LOCK, ACQUIRED.name());
-            if (recoveredBy != 0) {
-                update.set(REV_RECOVERY_BY, recoveredBy);
-            }
-            ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
-            if (old != null) {
-                LOG.info("Acquired recovery lock for cluster id {}", clusterId);
-            }
-            return old != null;
-        } catch (RuntimeException ex) {
-            LOG.error("Failed to acquire the recovery lock for clusterNodeId " + clusterId, ex);
-            throw (ex);
-        }
-    }
-
-    /**
-     * Checks if the recovering cluster node is inactive and then tries to
-     * break the recovery lock.
-     *
-     * @param doc the cluster node info document of the cluster node to acquire
-     *            the recovery lock for.
-     * @param recoveredBy id of cluster doing the recovery.
-     * @return whether the lock has been acquired.
-     */
-    private boolean tryBreakRecoveryLock(ClusterNodeInfoDocument doc,
-                                         int recoveredBy) {
-        Long recoveryBy = doc.getRecoveryBy();
-        if (recoveryBy == null) {
-            // cannot determine current lock owner
-            return false;
-        }
-        ClusterNodeInfoDocument recovering = getClusterNodeInfo(recoveryBy.intValue());
-        if (recovering == null) {
-            // cannot determine current lock owner
-            return false;
-        }
-        if (recovering.isActive() && recovering.getLeaseEndTime() > clock.getTime()) {
-            // still active, cannot break lock
-            return false;
-        }
-        // try to break the lock
-        try {
-            UpdateOp update = new UpdateOp(Integer.toString(doc.getClusterId()), false);
-            update.equals(STATE, ACTIVE.name());
-            update.equals(REV_RECOVERY_LOCK, ACQUIRED.name());
-            update.equals(REV_RECOVERY_BY, recoveryBy);
-            update.set(REV_RECOVERY_BY, recoveredBy);
-            ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
-            if (old != null) {
-                LOG.info("Acquired (broke) recovery lock for cluster id {}. " +
-                        "Previous lock owner: {}", doc.getClusterId(), recoveryBy);
-            }
-            return old != null;
-        } catch (RuntimeException ex) {
-            LOG.error("Failed to break the recovery lock for clusterNodeId " +
-                    doc.getClusterId(), ex);
-            throw (ex);
-        }
+    public boolean isRecoveryNeeded(@Nonnull ClusterNodeInfoDocument nodeInfo) {
+        return nodeInfo.isRecoveryNeeded(clock.getTime());
     }
 }

Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java?rev=1834986&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java Tue Jul  3 15:25:38 2018
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+/**
+ * This handler gets called back when recovery is needed for a clusterId. An
+ * implementation then tries to perform the recovery and returns whether the
+ * recovery was successful. Upon successful recovery, the clusterId will have
+ * transitioned to the inactive state.
+ */
+interface RecoveryHandler {
+
+    /**
+     * A no-op recovery handler, always returning false.
+     */
+    RecoveryHandler NOOP = clusterId -> false;
+
+    /**
+     * Perform recovery for the given clusterId and return whether the recovery
+     * was successful.
+     *
+     * @param clusterId perform recovery for this clusterId.
+     * @return {@code true} if recovery was successful, {@code false} otherwise.
+     */
+    boolean recover(int clusterId);
+}

Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java?rev=1834986&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java Tue Jul  3 15:25:38 2018
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
+
+/**
+ * Implements the recovery handler on startup of a {@link DocumentNodeStore}.
+ */
+class RecoveryHandlerImpl implements RecoveryHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RecoveryHandlerImpl.class);
+
+    private static final int COMMIT_VALUE_CACHE_SIZE = 10000;
+
+    /**
+     * The timeout in milliseconds to wait for the recovery performed by
+     * another cluster node.
+     */
+    private long recoveryWaitTimeoutMS =
+            Long.getLong("oak.recoveryWaitTimeoutMS", 60000);
+
+    private final DocumentStore store;
+    private final Clock clock;
+    private final MissingLastRevSeeker lastRevSeeker;
+
+    RecoveryHandlerImpl(DocumentStore store,
+                        Clock clock,
+                        MissingLastRevSeeker lastRevSeeker) {
+        this.store = store;
+        this.clock = clock;
+        this.lastRevSeeker = lastRevSeeker;
+    }
+
+    @Override
+    public boolean recover(int clusterId) {
+        try {
+            return recoverInternal(clusterId);
+        } catch (DocumentStoreException e) {
+            LOG.warn("Recovery failed for cluster node {}", clusterId, e);
+            return false;
+        }
+    }
+
+    private boolean recoverInternal(int clusterId)
+            throws DocumentStoreException {
+        NodeDocument root = Utils.getRootDocument(store);
+        // prepare a context for recovery
+        RevisionContext context = new RecoveryContext(
+                root, clock, clusterId,
+                new CachingCommitValueResolver(COMMIT_VALUE_CACHE_SIZE, root::getSweepRevisions));
+        LastRevRecoveryAgent agent = new LastRevRecoveryAgent(
+                store, context, lastRevSeeker, id -> {});
+        long timeout = context.getClock().getTime() + recoveryWaitTimeoutMS;
+        int numRecovered = agent.recover(clusterId, timeout);
+        if (numRecovered == -1) {
+            ClusterNodeInfoDocument doc = store.find(CLUSTER_NODES, String.valueOf(clusterId));
+            String otherId = "n/a";
+            if (doc != null) {
+                otherId = String.valueOf(doc.get(ClusterNodeInfo.REV_RECOVERY_BY));
+            }
+            String msg = "This cluster node (" + clusterId + ") requires " +
+                    "_lastRev recovery which is currently performed by " +
+                    "another cluster node (" + otherId + "). Recovery is " +
+                    "still ongoing after " + recoveryWaitTimeoutMS + " ms.";
+            LOG.info(msg);
+            return false;
+        }
+
+        return true;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLock.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLock.java?rev=1834986&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLock.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLock.java Tue Jul  3 15:25:38 2018
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.ACTIVE;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.LEASE_END_KEY;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_BY;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_LOCK;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState.ACQUIRED;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.STATE;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
+
+/**
+ * Utility class to acquire and release a recovery lock on an entry in the
+ * {@link Collection#CLUSTER_NODES} collection.
+ */
+class RecoveryLock {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RecoveryLock.class);
+
+    private final DocumentStore store;
+
+    private final Clock clock;
+
+    private final int clusterId;
+
+    /**
+     * Prepare a recovery lock on the document store for an entry with the given
+     * {@code clusterId}. Constructing the lock does not check whether an entry
+     * actually exists for the {@code clusterId} and is indeed valid.
+     *
+     * @param store the store where the cluster node entries are stored.
+     * @param clock the clock used to check whether an entry's lease expired.
+     * @param clusterId the {@code clusterId} this lock is created for.
+     */
+    RecoveryLock(DocumentStore store, Clock clock, int clusterId) {
+        this.store = store;
+        this.clock = clock;
+        this.clusterId = clusterId;
+    }
+
+    /**
+     * Acquire a recovery lock for the cluster node info entry with the
+     * {@code clusterId} specified in the constructor of this recovery lock.
+     * This method may break a lock when it determines the cluster node holding
+     * the recovery lock is no more active or its lease expired.
+     *
+     * @param recoveredBy id of cluster doing the recovery
+     * @return whether the lock has been acquired
+     */
+    boolean acquireRecoveryLock(int recoveredBy) {
+        ClusterNodeInfoDocument doc = store.find(CLUSTER_NODES, String.valueOf(clusterId));
+        if (doc == null) {
+            // this is unexpected...
+            return false;
+        }
+        if (!doc.isRecoveryNeeded(clock.getTime())) {
+            return false;
+        }
+        if (tryAcquireRecoveryLock(doc, recoveredBy)) {
+            return true;
+        }
+        // either we already own the lock or were able to break the lock
+        return doc.isBeingRecoveredBy(recoveredBy)
+                || tryBreakRecoveryLock(doc, recoveredBy);
+    }
+
+    /**
+     * Releases the recovery lock on the given {@code clusterId}. If
+     * {@code success} is {@code true}, the state of the cluster node entry
+     * is reset, otherwise it is left as is. That is, for a cluster node which
+     * requires recovery and the recovery process failed, the state will still
+     * be active, when this release method is called with {@code success} set
+     * to {@code false}.
+     *
+     * @param success whether recovery was successful.
+     */
+    void releaseRecoveryLock(boolean success) {
+        try {
+            UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
+            update.set(REV_RECOVERY_LOCK, ClusterNodeInfo.RecoverLockState.NONE.name());
+            update.set(REV_RECOVERY_BY, null);
+            if (success) {
+                update.set(STATE, null);
+                update.set(LEASE_END_KEY, null);
+            } else {
+                // make sure lease is expired
+                update.set(LEASE_END_KEY, clock.getTime() - 1);
+            }
+            ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
+            if (old == null) {
+                throw new RuntimeException("ClusterNodeInfo document for " + clusterId + " missing.");
+            }
+            LOG.info("Released recovery lock for cluster id {} (recovery successful: {})",
+                    clusterId, success);
+        } catch (RuntimeException ex) {
+            LOG.error("Failed to release the recovery lock for clusterNodeId " + clusterId, ex);
+            throw (ex);
+        }
+    }
+
+    //-------------------------------< internal >-------------------------------
+
+    /**
+     * Acquire a recovery lock for the given cluster node info document
+     *
+     * @param info
+     *            info document of the cluster that is going to be recovered
+     * @param recoveredBy
+     *            id of cluster doing the recovery ({@code 0} when unknown)
+     * @return whether the lock has been acquired
+     */
+    private boolean tryAcquireRecoveryLock(ClusterNodeInfoDocument info,
+                                           int recoveredBy) {
+        int clusterId = info.getClusterId();
+        try {
+            UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
+            update.equals(STATE, ACTIVE.name());
+            update.equals(LEASE_END_KEY, info.getLeaseEndTime());
+            update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
+            update.set(REV_RECOVERY_LOCK, ACQUIRED.name());
+            // Renew the lease once to give the recovery some time to finish
+            // in case recovery is done by the same clusterId. In this scenario
+            // the lease is not updated by a background thread.
+            update.set(LEASE_END_KEY, clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS);
+            if (recoveredBy != 0) {
+                update.set(REV_RECOVERY_BY, recoveredBy);
+            }
+            ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
+            if (old != null) {
+                LOG.info("Acquired recovery lock for cluster id {}", clusterId);
+            }
+            return old != null;
+        } catch (RuntimeException ex) {
+            LOG.error("Failed to acquire the recovery lock for clusterNodeId " + clusterId, ex);
+            throw (ex);
+        }
+    }
+
+    /**
+     * Checks if the recovering cluster node is inactive and then tries to
+     * break the recovery lock.
+     *
+     * @param doc the cluster node info document of the cluster node to acquire
+     *            the recovery lock for.
+     * @param recoveredBy id of cluster doing the recovery.
+     * @return whether the lock has been acquired.
+     */
+    private boolean tryBreakRecoveryLock(ClusterNodeInfoDocument doc,
+                                         int recoveredBy) {
+        Long recoveryBy = doc.getRecoveryBy();
+        if (recoveryBy == null) {
+            // cannot determine current lock owner
+            return false;
+        }
+        ClusterNodeInfoDocument recovering = store.find(CLUSTER_NODES, String.valueOf(recoveryBy));
+        if (recovering == null) {
+            // cannot determine current lock owner
+            return false;
+        }
+        long now = clock.getTime();
+        long leaseEnd = recovering.getLeaseEndTime();
+        if (recovering.isActive() && leaseEnd > now) {
+            // still active, cannot break lock
+            return false;
+        }
+        // try to break the lock
+        try {
+            UpdateOp update = new UpdateOp(Integer.toString(doc.getClusterId()), false);
+            update.equals(STATE, ACTIVE.name());
+            update.equals(REV_RECOVERY_LOCK, ACQUIRED.name());
+            update.equals(REV_RECOVERY_BY, recoveryBy);
+            update.set(REV_RECOVERY_BY, recoveredBy);
+            // Renew the lease once to give the recovery some time to finish
+            // in case recovery is done by the same clusterId. In this scenario
+            // the lease is not updated by a background thread.
+            update.set(LEASE_END_KEY, clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS);
+            ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
+            if (old != null) {
+                LOG.info("Acquired (broke) recovery lock for cluster id {}. " +
+                        "Previous lock owner: {}", doc.getClusterId(), recoveryBy);
+            }
+            return old != null;
+        } catch (RuntimeException ex) {
+            LOG.error("Failed to break the recovery lock for clusterNodeId " +
+                    doc.getClusterId(), ex);
+            throw (ex);
+        }
+    }
+}