You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2014/04/15 14:51:25 UTC

svn commit: r1587560 - in /jackrabbit/oak/branches/1.0: ./ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/

Author: mreutegg
Date: Tue Apr 15 12:51:24 2014
New Revision: 1587560

URL: http://svn.apache.org/r1587560
Log:
OAK-1732: Cluster node lease not renewed in time

Modified:
    jackrabbit/oak/branches/1.0/   (props changed)
    jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
    jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java
    jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java
    jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
    jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java

Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
  Merged /jackrabbit/oak/trunk:r1587538

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1587560&r1=1587559&r2=1587560&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Tue Apr 15 12:51:24 2014
@@ -363,12 +363,12 @@ public class ClusterNodeInfo {
     /**
      * Renew the cluster id lease. This method needs to be called once in a while,
      * to ensure the same cluster id is not re-used by a different instance.
-     * 
-     * @param nextCheckMillis the millisecond offset
+     * The lease is only renewed when half of the lease time passed. That is,
+     * with a lease time of 60 seconds, the lease is renewed every 30 seconds.
      */
-    public void renewLease(long nextCheckMillis) {
+    public void renewLease() {
         long now = getCurrentTime();
-        if (now + nextCheckMillis + nextCheckMillis < leaseEndTime) {
+        if (now + leaseTime / 2 < leaseEndTime) {
             return;
         }
         UpdateOp update = new UpdateOp("" + id, true);

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1587560&r1=1587559&r2=1587560&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Apr 15 12:51:24 2014
@@ -211,6 +211,12 @@ public final class DocumentNodeStore
     private Thread backgroundThread;
 
     /**
+     * Background thread performing the clusterId lease renew.
+     * Will be {@code null} if {@link #clusterNodeInfo} is {@code null}.
+     */
+    private Thread leaseUpdateThread;
+
+    /**
      * Read/Write lock for background operations. Regular commits will acquire
      * a shared lock, while a background write acquires an exclusive lock.
      */
@@ -384,10 +390,17 @@ public final class DocumentNodeStore
         backgroundThread.setDaemon(true);
         checkLastRevRecovery();
         // Renew the lease because it may have been stale
-        backgroundRenewClusterIdLease();
+        renewClusterIdLease();
 
         backgroundThread.start();
 
+        if (clusterNodeInfo != null) {
+            leaseUpdateThread = new Thread(
+                    new BackgroundLeaseUpdate(this, isDisposed),
+                    "DocumentNodeStore lease update thread");
+            leaseUpdateThread.start();
+        }
+
         LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}", clusterId);
     }
 
@@ -409,6 +422,13 @@ public final class DocumentNodeStore
             } catch (InterruptedException e) {
                 // ignore
             }
+            if (leaseUpdateThread != null) {
+                try {
+                    leaseUpdateThread.join();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
             if (clusterNodeInfo != null) {
                 clusterNodeInfo.dispose();
             }
@@ -1307,7 +1327,6 @@ public final class DocumentNodeStore
         if (isDisposed.get()) {
             return;
         }
-        backgroundRenewClusterIdLease();
         if (simpleRevisionCounter != null) {
             // only when using timestamp
             return;
@@ -1330,11 +1349,11 @@ public final class DocumentNodeStore
         }
     }
 
-    private void backgroundRenewClusterIdLease() {
+    void renewClusterIdLease() {
         if (clusterNodeInfo == null) {
             return;
         }
-        clusterNodeInfo.renewLease(asyncDelay);
+        clusterNodeInfo.renewLease();
     }
 
     /**
@@ -1651,20 +1670,19 @@ public final class DocumentNodeStore
         return blobGC;
     }
 
-    /**
-     * A background thread.
-     */
-    static class BackgroundOperation implements Runnable {
+    static abstract class NodeStoreTask implements Runnable {
         final WeakReference<DocumentNodeStore> ref;
         private final AtomicBoolean isDisposed;
         private int delay;
 
-        BackgroundOperation(DocumentNodeStore nodeStore, AtomicBoolean isDisposed) {
+        NodeStoreTask(DocumentNodeStore nodeStore, AtomicBoolean isDisposed) {
             ref = new WeakReference<DocumentNodeStore>(nodeStore);
             delay = nodeStore.getAsyncDelay();
             this.isDisposed = isDisposed;
         }
 
+        protected abstract void execute(@Nonnull DocumentNodeStore nodeStore);
+
         @Override
         public void run() {
             while (delay != 0 && !isDisposed.get()) {
@@ -1678,7 +1696,7 @@ public final class DocumentNodeStore
                 DocumentNodeStore nodeStore = ref.get();
                 if (nodeStore != null) {
                     try {
-                        nodeStore.runBackgroundOperations();
+                        execute(nodeStore);
                     } catch (Throwable t) {
                         LOG.warn("Background operation failed: " + t.toString(), t);
                     }
@@ -1691,6 +1709,35 @@ public final class DocumentNodeStore
         }
     }
 
+    /**
+     * Background operations.
+     */
+    static class BackgroundOperation extends NodeStoreTask {
+
+        BackgroundOperation(DocumentNodeStore nodeStore,
+                            AtomicBoolean isDisposed) {
+            super(nodeStore, isDisposed);
+        }
+
+        @Override
+        protected void execute(@Nonnull DocumentNodeStore nodeStore) {
+            nodeStore.runBackgroundOperations();
+        }
+    }
+
+    static class BackgroundLeaseUpdate extends NodeStoreTask {
+
+        BackgroundLeaseUpdate(DocumentNodeStore nodeStore,
+                              AtomicBoolean isDisposed) {
+            super(nodeStore, isDisposed);
+        }
+
+        @Override
+        protected void execute(@Nonnull DocumentNodeStore nodeStore) {
+            nodeStore.renewClusterIdLease();
+        }
+    }
+
     public BlobStore getBlobStore() {
         return blobStore;
     }

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java?rev=1587560&r1=1587559&r2=1587560&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java Tue Apr 15 12:51:24 2014
@@ -19,7 +19,9 @@
 package org.apache.jackrabbit.oak.plugins.document;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.List;
 
@@ -75,14 +77,14 @@ public class ClusterInfoTest {
         op = new UpdateOp(list.get(0).getId(), false);
         op.set("readWriteMode", "read:xyz, write:abc");
         mem.findAndUpdate(Collection.CLUSTER_NODES, op);
-        ns1.runBackgroundOperations();
+        ns1.renewClusterIdLease();
         assertEquals(ReadPreference.primary(), mem.getReadPreference());
         assertEquals(WriteConcern.MAJORITY, mem.getWriteConcern());
 
         op = new UpdateOp(list.get(0).getId(), false);
         op.set("readWriteMode", "read:nearest, write:fsynced");
         mem.findAndUpdate(Collection.CLUSTER_NODES, op);
-        ns1.runBackgroundOperations();
+        ns1.renewClusterIdLease();
         assertEquals(ReadPreference.nearest(), mem.getReadPreference());
         assertEquals(WriteConcern.FSYNCED, mem.getWriteConcern());
 
@@ -90,6 +92,49 @@ public class ClusterInfoTest {
         ns2.dispose();
     }
 
+    @Test
+    public void renewLease() throws InterruptedException {
+        MemoryDocumentStore mem = new MemoryDocumentStore();
+        Clock clock = new Clock.Virtual();
+        clock.waitUntil(System.currentTimeMillis());
+        ClusterNodeInfo.setClock(clock);
+
+        DocumentNodeStore ns = new DocumentMK.Builder().
+                setDocumentStore(mem).
+                setAsyncDelay(0).
+                getNodeStore();
+
+        ClusterNodeInfo info = ns.getClusterInfo();
+        assertNotNull(info);
+
+        // current lease end
+        long leaseEnd = getLeaseEndTime(ns);
+
+        // wait a bit, but not more than half of the lease time
+        clock.waitUntil(clock.getTime() + (ns.getClusterInfo().getLeaseTime() / 2) - 1000);
+
+        // must not renew lease right now
+        ns.renewClusterIdLease();
+        assertEquals(leaseEnd, getLeaseEndTime(ns));
+
+        // wait some more time
+        clock.waitUntil(clock.getTime() + 2000);
+
+        // now the lease must be renewed
+        ns.renewClusterIdLease();
+        assertTrue(getLeaseEndTime(ns) > leaseEnd);
+
+        ns.dispose();
+    }
+
+    private static long getLeaseEndTime(DocumentNodeStore nodeStore) {
+        ClusterNodeInfoDocument doc = nodeStore.getDocumentStore().find(
+                Collection.CLUSTER_NODES,
+                String.valueOf(nodeStore.getClusterId()));
+        assertNotNull(doc);
+        return doc.getLeaseEndTime();
+    }
+
     @After
     public void tearDown(){
         ClusterNodeInfo.resetClockToDefault();

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java?rev=1587560&r1=1587559&r2=1587560&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java Tue Apr 15 12:51:24 2014
@@ -131,7 +131,7 @@ public class ClusterTest {
         assertEquals(1, c1.getId());
         c1.setLeaseTime(1);
         // this will quickly expire
-        c1.renewLease(1);
+        c1.renewLease();
         Thread.sleep(10);
         c2 = ClusterNodeInfo.getInstance(store, "m1", null);
         assertEquals(1, c2.getId());

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java?rev=1587560&r1=1587559&r2=1587560&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java Tue Apr 15 12:51:24 2014
@@ -125,7 +125,7 @@ public class LastRevRecoveryAgentTest {
         clock.waitUntil(clock.getTime() + leaseTime + 10);
 
         //Renew the lease for C1
-        ds1.getClusterInfo().renewLease(3*leaseTime);
+        ds1.getClusterInfo().renewLease();
 
         assertTrue(ds1.getLastRevRecoveryAgent().isRecoveryNeeded());
 

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java?rev=1587560&r1=1587559&r2=1587560&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java Tue Apr 15 12:51:24 2014
@@ -106,7 +106,7 @@ public class LastRevSingleNodeRecoveryTe
 
         // renew lease
         clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
-        mk.getClusterInfo().renewLease(0);
+        mk.getClusterInfo().renewLease();
 
         // so that the current time is more than the current lease end
         clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
@@ -148,7 +148,7 @@ public class LastRevSingleNodeRecoveryTe
         mk.backgroundWrite();
 
         clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime());
-        mk.getClusterInfo().renewLease(0);
+        mk.getClusterInfo().renewLease();
 
         // Should be 0
         int pendingCount = mk.getPendingWriteCount();
@@ -194,7 +194,7 @@ public class LastRevSingleNodeRecoveryTe
 
         // renew lease one last time
         clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime());
-        mk.getClusterInfo().renewLease(0);
+        mk.getClusterInfo().renewLease();
 
         clock.waitUntil(clock.getTime() + 5000);
         // add nodes won't trigger _lastRev updates