You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2014/04/02 08:12:42 UTC

svn commit: r1583888 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ test/java/org/apache/jackrabbit/oak/plugins/document/

Author: chetanm
Date: Wed Apr  2 06:12:41 2014
New Revision: 1583888

URL: http://svn.apache.org/r1583888
Log:
OAK-1295 - Recovery for missing _lastRev updates (WIP)

Applying patch from Amit Jain

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java   (contents, props changed)
      - copied, changed from r1583882, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java   (with props)
Removed:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1583888&r1=1583887&r2=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Wed Apr  2 06:12:41 2014
@@ -16,6 +16,8 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import static org.apache.jackrabbit.oak.plugins.document.Document.ID;
+
 import java.lang.management.ManagementFactory;
 import java.net.NetworkInterface;
 import java.util.ArrayList;
@@ -26,11 +28,10 @@ import java.util.UUID;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.jackrabbit.oak.plugins.document.Document.ID;
-
 /**
  * Information about a cluster node.
  */
@@ -57,7 +58,28 @@ public class ClusterNodeInfo {
     /**
      * The end of the lease.
      */
-    private static final String LEASE_END_KEY = "leaseEnd";
+    protected static final String LEASE_END_KEY = "leaseEnd";
+
+    /**
+     * The state of the cluster.
+     * On proper shutdown the state should be cleared.
+     */
+    protected static final String STATE = "state";
+
+    /**
+     * Flag to indicate whether the _lastRev recovery is in progress.
+     */
+    protected static final String REV_RECOVERY_LOCK = "revLock";
+
+    /**
+     * Active State.
+     */
+    private static final String ACTIVE_STATE = "active";
+
+    /**
+     * _lastRev recovery in progress
+     */
+    protected static final String REV_RECOVERY_ON = "true";
 
     /**
      * Additional info, such as the process id, for support.
@@ -85,6 +107,11 @@ public class ClusterNodeInfo {
     private static final String WORKING_DIR = System.getProperty("user.dir", "");
 
     /**
+     * <b>Only Used For Testing</b>
+     */
+    private static Clock clock;
+
+    /**
      * The number of milliseconds for a lease (1 minute by default, and
      * initially).
      */
@@ -130,13 +157,35 @@ public class ClusterNodeInfo {
      */
     private String readWriteMode;
 
+    /**
+     * The state of the cluter node.
+     */
+    private String state;
+
+    /**
+     * The revLock value of the cluster;
+     */
+    private String revRecoveryLock;
+
     ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId) {
         this.id = id;
-        this.startTime = System.currentTimeMillis();
+        this.startTime = (clock == null ? System.currentTimeMillis() : clock.getTime());
+        this.leaseEndTime = startTime;
+        this.store = store;
+        this.machineId = machineId;
+        this.instanceId = instanceId;
+    }
+
+    ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, String state,
+            String revRecoveryLock) {
+        this.id = id;
+        this.startTime = (clock == null ? System.currentTimeMillis() : clock.getTime());
         this.leaseEndTime = startTime;
         this.store = store;
         this.machineId = machineId;
         this.instanceId = instanceId;
+        this.state = state;
+        this.revRecoveryLock = revRecoveryLock;
     }
 
     public int getId() {
@@ -144,8 +193,15 @@ public class ClusterNodeInfo {
     }
 
     /**
+     * <b>Only Used For Testing</b>
+     */
+    static void setClock(Clock c) {
+        clock = c;
+    }
+
+    /**
      * Create a cluster node info instance for the store, with the
-     *
+     * 
      * @param store the document store (for the lease)
      * @return the cluster node info
      */
@@ -155,13 +211,14 @@ public class ClusterNodeInfo {
 
     /**
      * Create a cluster node info instance for the store.
-     *
+     * 
      * @param store the document store (for the lease)
      * @param machineId the machine id (null for MAC address)
      * @param instanceId the instance id (null for current working directory)
      * @return the cluster node info
      */
-    public static ClusterNodeInfo getInstance(DocumentStore store, String machineId, String instanceId) {
+    public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
+            String instanceId) {
         if (machineId == null) {
             machineId = MACHINE_ID;
         }
@@ -174,9 +231,14 @@ public class ClusterNodeInfo {
             update.set(ID, String.valueOf(clusterNode.id));
             update.set(MACHINE_ID_KEY, clusterNode.machineId);
             update.set(INSTANCE_ID_KEY, clusterNode.instanceId);
-            update.set(LEASE_END_KEY, System.currentTimeMillis() + clusterNode.leaseTime);
+            update.set(LEASE_END_KEY,
+                    (clock == null ? System.currentTimeMillis() : clock.getTime())
+                            + clusterNode.leaseTime);
             update.set(INFO_KEY, clusterNode.toString());
-            boolean success = store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
+            update.set(STATE, clusterNode.state);
+            update.set(REV_RECOVERY_LOCK, clusterNode.revRecoveryLock);
+            boolean success =
+                    store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
             if (success) {
                 return clusterNode;
             }
@@ -184,13 +246,15 @@ public class ClusterNodeInfo {
         throw new MicroKernelException("Could not get cluster node info");
     }
 
-    private static ClusterNodeInfo createInstance(DocumentStore store, String machineId, String instanceId) {
-        long now = System.currentTimeMillis();
+    private static ClusterNodeInfo createInstance(DocumentStore store, String machineId,
+            String instanceId) {
+        long now = (clock == null ? System.currentTimeMillis() : clock.getTime());
         // keys between "0" and "a" includes all possible numbers
         List<ClusterNodeInfoDocument> list = store.query(Collection.CLUSTER_NODES,
                 "0", "a", Integer.MAX_VALUE);
         int clusterNodeId = 0;
         int maxId = 0;
+        String state = null;
         for (Document doc : list) {
             String key = doc.getId();
             int id;
@@ -222,28 +286,30 @@ public class ClusterNodeInfo {
             if (clusterNodeId == 0 || id < clusterNodeId) {
                 // if there are multiple, use the smallest value
                 clusterNodeId = id;
+                state = (String) doc.get(STATE);
             }
         }
         if (clusterNodeId == 0) {
             clusterNodeId = maxId + 1;
         }
-        return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId);
+        return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state, null);
     }
 
     /**
      * Renew the cluster id lease. This method needs to be called once in a while,
      * to ensure the same cluster id is not re-used by a different instance.
-     *
+     * 
      * @param nextCheckMillis the millisecond offset
      */
     public void renewLease(long nextCheckMillis) {
-        long now = System.currentTimeMillis();
+        long now = (clock == null ? System.currentTimeMillis() : clock.getTime());
         if (now + nextCheckMillis + nextCheckMillis < leaseEndTime) {
             return;
         }
         UpdateOp update = new UpdateOp("" + id, true);
         leaseEndTime = now + leaseTime;
         update.set(LEASE_END_KEY, leaseEndTime);
+        update.set(STATE, ACTIVE_STATE);
         ClusterNodeInfoDocument doc = store.createOrUpdate(Collection.CLUSTER_NODES, update);
         String mode = (String) doc.get(READ_WRITE_MODE_KEY);
         if (mode != null && !mode.equals(readWriteMode)) {
@@ -263,6 +329,8 @@ public class ClusterNodeInfo {
     public void dispose() {
         UpdateOp update = new UpdateOp("" + id, true);
         update.set(LEASE_END_KEY, null);
+        update.set(STATE, null);
+        update.set(REV_RECOVERY_LOCK, null);
         store.createOrUpdate(Collection.CLUSTER_NODES, update);
     }
 
@@ -273,8 +341,10 @@ public class ClusterNodeInfo {
                 "machineId: " + machineId + ",\n" +
                 "instanceId: " + instanceId + ",\n" +
                 "pid: " + PROCESS_ID + ",\n" +
-                "uuid: " + uuid +",\n" +
-                "readWriteMode: " + readWriteMode;
+                "uuid: " + uuid + ",\n" +
+                "readWriteMode: " + readWriteMode + ",\n" +
+                "state: " + state + ",\n" +
+                "revLock: " + revRecoveryLock;
     }
 
     private static long getProcessId() {
@@ -290,7 +360,7 @@ public class ClusterNodeInfo {
     /**
      * Calculate the unique machine id. This is the lowest MAC address if
      * available. As an alternative, a randomly generated UUID is used.
-     *
+     * 
      * @return the unique id
      */
     private static String getMachineId() {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1583888&r1=1583887&r2=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Wed Apr  2 06:12:41 2014
@@ -291,6 +291,8 @@ public final class DocumentNodeStore
 
     private final Executor executor;
 
+    private final LastRevRecoveryAgent lastRevRecoveryAgent;
+
     public DocumentNodeStore(DocumentMK.Builder builder) {
         this.blobStore = builder.getBlobStore();
         if (builder.isUseSimpleRevision()) {
@@ -322,6 +324,7 @@ public final class DocumentNodeStore
         this.branches = new UnmergedBranches(getRevisionComparator());
         this.asyncDelay = builder.getAsyncDelay();
         this.versionGarbageCollector = new VersionGarbageCollector(this);
+        this.lastRevRecoveryAgent = new LastRevRecoveryAgent(this);
         this.missing = new DocumentNodeState(this, "MISSING", new Revision(0, 0, 0)) {
             @Override
             public int getMemory() {
@@ -378,11 +381,19 @@ public final class DocumentNodeStore
                 new BackgroundOperation(this, isDisposed),
                 "DocumentNodeStore background thread");
         backgroundThread.setDaemon(true);
+        checkLastRevRecovery();
         backgroundThread.start();
 
         LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}", clusterId);
     }
 
+    /**
+     * Recover _lastRev recovery if needed.
+     */
+    private void checkLastRevRecovery() {
+        lastRevRecoveryAgent.recover(clusterId);
+    }
+
     public void dispose() {
         runBackgroundOperations();
         if (!isDisposed.getAndSet(true)) {
@@ -1707,4 +1718,8 @@ public final class DocumentNodeStore
     public VersionGarbageCollector getVersionGarbageCollector() {
         return versionGarbageCollector;
     }
+    @Nonnull
+    public LastRevRecoveryAgent getLastRevRecoveryAgent() {
+        return lastRevRecoveryAgent;
+    }
 }

Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (from r1583882, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java&r1=1583882&r2=1583888&rev=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Wed Apr  2 06:12:41 2014
@@ -19,6 +19,12 @@
 
 package org.apache.jackrabbit.oak.plugins.document;
 
+import static com.google.common.collect.ImmutableList.of;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.mergeSorted;
+
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
@@ -27,23 +33,80 @@ import javax.annotation.CheckForNull;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Maps;
+
 import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoMissingLastRevSeeker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.collect.ImmutableList.of;
-import static com.google.common.collect.Iterables.filter;
-import static com.google.common.collect.Iterables.mergeSorted;
 
-public class LastRevRecovery {
+/**
+ * Utility class for recovering potential missing _lastRev updates of nodes due to crash of a node.
+ */
+public class LastRevRecoveryAgent {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final DocumentNodeStore nodeStore;
 
-    public LastRevRecovery(DocumentNodeStore nodeStore) {
+    private final MissingLastRevSeeker missingLastRevUtil;
+
+    LastRevRecoveryAgent(DocumentNodeStore nodeStore) {
         this.nodeStore = nodeStore;
+
+        if (nodeStore.getDocumentStore() instanceof MongoDocumentStore) {
+            this.missingLastRevUtil =
+                    new MongoMissingLastRevSeeker((MongoDocumentStore) nodeStore.getDocumentStore());
+        } else {
+            this.missingLastRevUtil = new MissingLastRevSeeker(nodeStore.getDocumentStore());
+        }
     }
 
+    /**
+     * Recover the correct _lastRev updates for potentially missing candidate nodes.
+     * 
+     * @param clusterId the cluster id for which the _lastRev are to be recovered
+     * @return the int the number of restored nodes
+     */
+    public int recover(int clusterId) {
+        ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
+
+        if (nodeInfo != null) {
+            Long leaseEnd = (Long) (nodeInfo.get(ClusterNodeInfo.LEASE_END_KEY));
+
+            // Check if _lastRev recovery needed for this cluster node
+            // state == null && recoveryLock not held by someone
+            if (nodeInfo.get(ClusterNodeInfo.STATE) != null
+                    && nodeInfo.get(ClusterNodeInfo.REV_RECOVERY_LOCK) == null) {
+
+                // retrieve the root document's _lastRev
+                NodeDocument root = missingLastRevUtil.getRoot();
+                Revision lastRev = root.getLastRev().get(clusterId);
+
+                // start time is the _lastRev timestamp of this cluster node
+                long startTime = lastRev.getTimestamp();
+
+                // Endtime is the leaseEnd + the asyncDelay
+                long endTime = leaseEnd + nodeStore.getAsyncDelay();
+
+                log.info("Recovering candidates modified in time range : {0}",
+                        new Object[] {startTime, endTime});
+
+                return recoverCandidates(clusterId, startTime, endTime);
+            }
+        }
+
+        log.info("No recovery needed for clusterId");
+        return 0;
+    }
+
+    /**
+     * Recover the correct _lastRev updates for the given candidate nodes.
+     * 
+     * @param suspects the potential suspects
+     * @param clusterId the cluster id for which _lastRev recovery needed
+     * @return the int
+     */
     public int recover(Iterator<NodeDocument> suspects, int clusterId) {
         UnsavedModifications unsaved = new UnsavedModifications();
         UnsavedModifications unsavedParents = new UnsavedModifications();
@@ -87,15 +150,15 @@ public class LastRevRecovery {
             }
         }
 
-        for(String parentPath : unsavedParents.getPaths()){
+        for (String parentPath : unsavedParents.getPaths()) {
             Revision calcLastRev = unsavedParents.get(parentPath);
             Revision knownLastRev = knownLastRevs.get(parentPath);
 
             //Copy the calcLastRev of parent only if they have changed
             //In many case it might happen that parent have consistent lastRev
             //This check ensures that unnecessary updates are not made
-            if(knownLastRev == null
-                    || calcLastRev.compareRevisionTime(knownLastRev) > 0){
+            if (knownLastRev == null
+                    || calcLastRev.compareRevisionTime(knownLastRev) > 0) {
                 unsaved.put(parentPath, calcLastRev);
             }
         }
@@ -105,7 +168,8 @@ public class LastRevRecovery {
         int size = unsaved.getPaths().size();
 
         if (log.isDebugEnabled()) {
-            log.debug("Last revision for following documents would be updated {}", unsaved.getPaths());
+            log.debug("Last revision for following documents would be updated {}", unsaved
+                    .getPaths());
         }
 
         //UnsavedModifications is designed to be used in concurrent
@@ -120,13 +184,47 @@ public class LastRevRecovery {
     }
 
     /**
+     * Retrieves possible candidates which have been modifed in the time range and recovers the
+     * missing updates.
+     * 
+     * @param clusterId the cluster id
+     * @param startTime the start time
+     * @param endTime the end time
+     * @return the int the number of restored nodes
+     */
+    private int recoverCandidates(final int clusterId, final long startTime, final long endTime) {
+        // take a lock on the update process by setting the value of the lock to true
+        updateRecoveryLockOnCluster(clusterId, ClusterNodeInfo.REV_RECOVERY_ON);
+
+        Iterable<NodeDocument> suspects =
+                missingLastRevUtil.getCandidates(startTime, endTime);
+        if (log.isDebugEnabled()) {
+            log.debug("_lastRev recovery candidates : {}", suspects);
+        }
+
+        try {
+            return recover(suspects.iterator(), clusterId);
+        } finally {
+            if (suspects instanceof Closeable) {
+                try {
+                    ((Closeable) suspects).close();
+                } catch (IOException e) {
+                    log.error("Error closing iterable : ", e);
+                }
+            }
+            // Relinquish the lock on the recovery for the cluster on the clusterInfo
+            updateRecoveryLockOnCluster(clusterId, null);
+        }
+    }
+
+    /**
      * Determines the last revision value which needs to set for given clusterId
      * on the passed document. If the last rev entries are consisted
-     *
-     * @param doc       NodeDocument where lastRev entries needs to be fixed
+     * 
+     * @param doc NodeDocument where lastRev entries needs to be fixed
      * @param clusterId clusterId for which lastRev has to be checked
      * @return lastRev which needs to be updated. <tt>null</tt> if no
-     * updated is required i.e. lastRev entries are valid
+     *         updated is required i.e. lastRev entries are valid
      */
     @CheckForNull
     private Revision determineMissedLastRev(NodeDocument doc, int clusterId) {
@@ -137,34 +235,46 @@ public class LastRevRecovery {
 
         ClusterPredicate cp = new ClusterPredicate(clusterId);
 
-        //Merge sort the revs for which changes have been made
-        //to this doc
+        // Merge sort the revs for which changes have been made
+        // to this doc
 
-        //localMap always keeps the most recent valid commit entry
-        //per cluster node so looking into that should be sufficient
+        // localMap always keeps the most recent valid commit entry
+        // per cluster node so looking into that should be sufficient
         Iterable<Revision> revs = mergeSorted(of(
-                        filter(doc.getLocalCommitRoot().keySet(), cp),
-                        filter(doc.getLocalRevisions().keySet(), cp)),
+                filter(doc.getLocalCommitRoot().keySet(), cp),
+                filter(doc.getLocalRevisions().keySet(), cp)),
                 StableRevisionComparator.REVERSE
-        );
+                );
 
-        //Look for latest valid revision > currentLastRev
-        //if found then lastRev needs to be fixed
+        // Look for latest valid revision > currentLastRev
+        // if found then lastRev needs to be fixed
         for (Revision rev : revs) {
             if (rev.compareRevisionTime(currentLastRev) > 0) {
                 if (doc.isCommitted(rev)) {
                     return rev;
                 }
             } else {
-                //No valid revision found > currentLastRev
-                //indicates that lastRev is valid for given clusterId
-                //and no further checks are required
+                // No valid revision found > currentLastRev
+                // indicates that lastRev is valid for given clusterId
+                // and no further checks are required
                 break;
             }
         }
         return null;
     }
 
+    /**
+     * Set/Unset lock value on the clusterInfo for the clusterId
+     * 
+     * @param clusterId for which _lastRev recovery operation performed
+     * @param value the lock value
+     */
+    protected void updateRecoveryLockOnCluster(int clusterId, String value) {
+        UpdateOp update = new UpdateOp("" + clusterId, true);
+        update.set(ClusterNodeInfo.REV_RECOVERY_LOCK, value);
+        nodeStore.getDocumentStore().createOrUpdate(Collection.CLUSTER_NODES, update);
+    }
+
     private static class ClusterPredicate implements Predicate<Revision> {
         private final int clusterId;
 
@@ -178,3 +288,4 @@ public class LastRevRecovery {
         }
     }
 }
+

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java?rev=1583888&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java Wed Apr  2 06:12:41 2014
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+
+/**
+ * Utils to retrieve _lastRev missing update candidates.
+ */
+public class MissingLastRevSeeker {
+    protected final String ROOT_PATH = "/";
+    private final DocumentStore store;
+
+    public MissingLastRevSeeker(DocumentStore store) {
+        this.store = store;
+    }
+
+    /**
+     * Gets the cluster node info for the given cluster node id.
+     *
+     * @param clusterId the cluster id
+     * @return the cluster node info
+     */
+    public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
+        // Fetch all documents.
+        List<ClusterNodeInfoDocument> nodes = store.query(Collection.CLUSTER_NODES, "0",
+                "a", Integer.MAX_VALUE);
+        Iterable<ClusterNodeInfoDocument> clusterIterable =
+                Iterables.filter(nodes,
+                        new Predicate<ClusterNodeInfoDocument>() {
+                            // Return cluster info for the required clusterId
+                            @Override
+                            public boolean apply(ClusterNodeInfoDocument input) {
+                                String id = input.getId();
+                                return (id.equals(String.valueOf(clusterId)));
+                            }
+                        });
+
+        if (clusterIterable.iterator().hasNext()) {
+            return clusterIterable.iterator().next();
+        }
+
+        return null;
+    }
+
+    /**
+     * Get the candidates with modified time between the time range specified.
+     *
+     * @param startTime the start of the time range
+     * @param endTime the end of the time range
+     * @return the candidates
+     */
+    public Iterable<NodeDocument> getCandidates(final long startTime, final long endTime) {
+        // Fetch all documents.
+        List<NodeDocument> nodes = store.query(Collection.NODES, NodeDocument.MIN_ID_VALUE,
+                NodeDocument.MAX_ID_VALUE, Integer.MAX_VALUE);
+        return Iterables.filter(nodes, new Predicate<NodeDocument>() {
+            @Override
+            public boolean apply(NodeDocument input) {
+                Long modified = (Long) input.get(NodeDocument.MODIFIED_IN_SECS);
+                return (modified != null
+                        && (modified > TimeUnit.MILLISECONDS.toSeconds(startTime))
+                        && (modified < TimeUnit.MILLISECONDS.toSeconds(endTime)));
+            }
+        });
+    }
+
+    public NodeDocument getRoot() {
+        return store.find(Collection.NODES, Utils.getIdFromPath(ROOT_PATH));
+    }
+}
+

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java?rev=1583888&r1=1583887&r2=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java Wed Apr  2 06:12:41 2014
@@ -24,6 +24,7 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.jackrabbit.oak.stats.Clock;
 /**
  * A revision.
  */
@@ -58,6 +59,20 @@ public class Revision {
      */
     private final boolean branch;
 
+    /** Only set for testing */
+    private static Clock clock;
+
+    /**
+     * <b>
+     * Only to be used for testing.
+     * Do Not Use Otherwise
+     * </b>
+     * 
+     * @param c - the clock
+     */
+    static void setClock(Clock c) {
+        clock = c;
+    }
     public Revision(long timestamp, int counter, int clusterId) {
         this(timestamp, counter, clusterId, false);
     }
@@ -150,6 +165,9 @@ public class Revision {
      */
     public static long getCurrentTimestamp() {
         long timestamp = System.currentTimeMillis();
+        if (clock != null) {
+            timestamp = clock.getTime();
+        }
         if (timestamp < lastTimestamp) {
             // protect against decreases in the system time,
             // time machines, and other fluctuations in the time continuum

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java?rev=1583888&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java Wed Apr  2 06:12:41 2014
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import static com.google.common.collect.Iterables.transform;
+
+import com.google.common.base.Function;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.QueryBuilder;
+import com.mongodb.ReadPreference;
+
+import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.apache.jackrabbit.oak.plugins.document.Commit;
+import org.apache.jackrabbit.oak.plugins.document.MissingLastRevSeeker;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
+
+/**
+ * Mongo specific version of MissingLastRevSeeker which uses mongo queries
+ * to fetch candidates which may have missed '_lastRev' updates.
+ * 
+ * Uses a time range to find documents modified during that interval.
+ */
+public class MongoMissingLastRevSeeker extends MissingLastRevSeeker {
+    private final MongoDocumentStore store;
+
+    public MongoMissingLastRevSeeker(MongoDocumentStore store) {
+        super(store);
+        this.store = store;
+    }
+
+    @Override
+    public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
+        DBObject query =
+                QueryBuilder
+                        .start(NodeDocument.ID).is(String.valueOf(clusterId)).get();
+        DBCursor cursor =
+                getClusterNodeCollection().find(query)
+                        .setReadPreference(ReadPreference.secondaryPreferred());
+        try {
+            if (cursor.hasNext()) {
+                DBObject obj = cursor.next();
+                return store.convertFromDBObject(Collection.CLUSTER_NODES, obj);
+            }
+        } finally {
+            cursor.close();
+        }
+        return null;
+    }
+
+    @Override
+    public CloseableIterable<NodeDocument> getCandidates(final long startTime,
+            final long endTime) {
+        DBObject query =
+                QueryBuilder
+                        .start(NodeDocument.MODIFIED_IN_SECS).lessThanEquals(
+                                Commit.getModifiedInSecs(endTime))
+                        .put(NodeDocument.MODIFIED_IN_SECS).greaterThanEquals(
+                                Commit.getModifiedInSecs(startTime))
+                        .get();
+        DBObject sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, -1);
+
+        DBCursor cursor =
+                getNodeCollection().find(query)
+                        .sort(sortFields)
+                        .setReadPreference(
+                                ReadPreference.secondaryPreferred());
+        return CloseableIterable.wrap(transform(cursor, new Function<DBObject, NodeDocument>() {
+            @Override
+            public NodeDocument apply(DBObject input) {
+                return store.convertFromDBObject(Collection.NODES, input);
+            }
+        }), cursor);
+    }
+
+    private DBCollection getNodeCollection() {
+        return store.getDBCollection(Collection.NODES);
+    }
+
+    private DBCollection getClusterNodeCollection() {
+        return store.getDBCollection(Collection.CLUSTER_NODES);
+    }
+}
+

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java?rev=1583888&r1=1583887&r2=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java Wed Apr  2 06:12:41 2014
@@ -134,5 +134,14 @@ public abstract class DocumentStoreFixtu
                 return false;
             }
         }
+        
+        @Override
+        public void dispose() {
+            try{
+                MongoConnection connection = new MongoConnection(uri);
+                connection.getDB().dropDatabase();
+            } catch(Exception ignore) {
+            }
+        }
     }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java?rev=1583888&r1=1583887&r2=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java Wed Apr  2 06:12:41 2014
@@ -90,7 +90,7 @@ public class LastRevRecoveryTest {
         //lastRev should not be updated for C #2
         assertNull(y1.getLastRev().get(2));
 
-        LastRevRecovery recovery = new LastRevRecovery(ds1);
+        LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds1);
 
         //Do not pass y1 but still y1 should be updated
         recovery.recover(Iterators.forArray(x1,z1), 2);

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java?rev=1583888&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java Wed Apr  2 06:12:41 2014
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests the restore of potentially missing _lastRev updates.
+ */
+@RunWith(Parameterized.class)
+public class LastRevSingleNodeRecoveryTest {
+    private DocumentStoreFixture fixture;
+
+    private Clock clock;
+
+    private DocumentMK mk;
+
+    public LastRevSingleNodeRecoveryTest(DocumentStoreFixture fixture) {
+        this.fixture = fixture;
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> fixtures() throws IOException {
+        List<Object[]> fixtures = Lists.newArrayList();
+
+        DocumentStoreFixture mongo = new DocumentStoreFixture.MongoFixture();
+        if (mongo.isAvailable()) {
+            fixtures.add(new Object[] {mongo});
+        }
+        return fixtures;
+    }
+
+    private DocumentMK createMK(int clusterId) throws InterruptedException {
+        clock = new Clock.Virtual();
+        return openMK(clusterId, fixture.createDocumentStore());
+    }
+
+    private DocumentMK openMK(int clusterId, DocumentStore store) throws InterruptedException {
+        clock.waitUntil(System.currentTimeMillis());
+
+        // Sets the clock for testing
+        ClusterNodeInfo.setClock(clock);
+        Revision.setClock(clock);
+
+        DocumentMK.Builder builder = new DocumentMK.Builder();
+        builder.setAsyncDelay(0)
+                .setClusterId(clusterId)
+                .clock(clock)
+                .setDocumentStore(store);
+        mk = builder.open();
+        clock.waitUntil(Revision.getCurrentTimestamp());
+
+        return mk;
+    }
+
+    @Before
+    public void setUp() throws InterruptedException {
+        try {
+            mk = createMK(0);
+            Assume.assumeNotNull(mk);
+
+            // initialize node hierarchy
+            mk.commit("/", "+\"x\" : { \"y\": {\"z\":{} } }", null, null);
+            mk.commit("/", "+\"a\" : { \"b\": {\"c\": {}} }", null, null);
+        } catch (Exception e) {
+            Assume.assumeNoException(e);
+        }
+    }
+
+    @Test
+    public void testLastRevRestoreOnNodeStart() throws Exception {
+        // pending updates
+        setupScenario();
+
+        // renew lease
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
+        mk.getClusterInfo().renewLease(0);
+
+        // so that the current time is more than the current lease end
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
+        // Recreate mk instance, to simulate fail condition and recovery on start
+        mk = openMK(0, mk.getNodeStore().getDocumentStore());
+
+        int pendingCount = mk.getPendingWriteCount();
+
+        // so that the current time is more than the current lease end
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
+        // Immediately check again, now should not have done any changes.
+        LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent();
+        /** Now there should have been pendingCount updates **/
+        assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId()));
+    }
+
+    @Test
+    public void testLastRevRestore() throws Exception {
+        setupScenario();
+
+        int pendingCount = mk.getPendingWriteCount();
+        // so that the current time is more than the current lease end
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
+        LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent();
+
+        /** All pending updates should have been restored **/
+        assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId()));
+    }
+
+
+    @Test
+    public void testNoMissingUpdates() throws Exception {
+        setupScenario();
+        mk.backgroundWrite();
+
+        // move the time forward and do another update of the root node so that only 2 nodes are
+        // candidates
+        clock.waitUntil(clock.getTime() + 5000);
+        mk.commit("/", "^\"a/key2\" : \"value2\"", null, null);
+        mk.backgroundWrite();
+
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime());
+        mk.getClusterInfo().renewLease(0);
+
+        // Should be 0
+        int pendingCount = mk.getPendingWriteCount();
+        LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent();
+
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime());
+        /** There should have been no updates **/
+        assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId()));
+    }
+
+    private void setupScenario() throws InterruptedException {
+        // add some nodes which won't be returned
+        mk.commit("/", "+\"u\" : { \"v\": {}}", null, null);
+        mk.commit("/u", "^\"v/key1\" : \"value1\"", null, null);
+
+        // move the time forward so that the root gets updated
+        clock.waitUntil(clock.getTime() + 5000);
+        mk.commit("/", "^\"a/key1\" : \"value1\"", null, null);
+        mk.backgroundWrite();
+
+        // move the time forward to have a new node under root
+        clock.waitUntil(clock.getTime() + 5000);
+        mk.commit("/", "+\"p\":{}", null, null);
+
+        // move the time forward to write all pending changes
+        clock.waitUntil(clock.getTime() + 5000);
+        mk.backgroundWrite();
+
+        // renew lease one last time
+        clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime());
+        mk.getClusterInfo().renewLease(0);
+
+        clock.waitUntil(clock.getTime() + 5000);
+        // add nodes won't trigger _lastRev updates
+        addNodes();
+    }
+
+    /**
+     * Should have the
+     */
+    private void addNodes() {
+        // change node /a/b/c by adding a property
+        mk.commit("/a/b", "^\"c/key1\" : \"value1\"", null, null);
+        // add node /a/b/c/d
+        mk.commit("/a/b/c", "+\"d\":{}", null, null);
+        // add node /a/b/f
+        mk.commit("/a/b", "+\"f\" : {}", null, null);
+        // add node /a/b/f/e
+        mk.commit("/a/b/f", "+\"e\": {}", null, null);
+        // change node /x/y/z
+        mk.commit("/x/y", "^\"z/key1\" : \"value1\"", null, null);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        Revision.setClock(null);
+        ClusterNodeInfo.setClock(null);
+        mk.dispose();
+        fixture.dispose();
+    }
+}
+

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native