You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2016/09/13 08:21:57 UTC

svn commit: r1760492 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java

Author: mreutegg
Date: Tue Sep 13 08:21:57 2016
New Revision: 1760492

URL: http://svn.apache.org/viewvc?rev=1760492&view=rev
Log:
OAK-4770: Missing exception handling in ClusterNodeInfo.renewLease()

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1760492&r1=1760491&r2=1760492&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Tue Sep 13 08:21:57 2016
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.plugins.document;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.ACTIVE;
 import static org.apache.jackrabbit.oak.plugins.document.Document.ID;
 
 import java.lang.management.ManagementFactory;
@@ -29,6 +30,9 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
 
 import org.apache.jackrabbit.oak.commons.StringUtils;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
@@ -364,7 +368,7 @@ public class ClusterNodeInfo {
      * @return the cluster node info
      */
     public static ClusterNodeInfo getReadOnlyInstance(DocumentStore store) {
-        return new ClusterNodeInfo(0, store, MACHINE_ID, WORKING_DIR, ClusterNodeState.ACTIVE,
+        return new ClusterNodeInfo(0, store, MACHINE_ID, WORKING_DIR, ACTIVE,
                 RecoverLockState.NONE, null, true) {
             @Override
             public void dispose() {
@@ -785,6 +789,7 @@ public class ClusterNodeInfo {
         }
         // lease requires renewal
 
+        long updatedLeaseEndTime;
         synchronized(this) {
             // this is synchronized since access to leaseCheckFailed and leaseEndTime
             // are both normally synchronized to propagate values between renewLease()
@@ -799,12 +804,12 @@ public class ClusterNodeInfo {
             // synchronized could have delayed the 'now', so
             // set it again..
             now = getCurrentTime();
-            leaseEndTime = now + leaseTime;
+            updatedLeaseEndTime = now + leaseTime;
         }
 
         UpdateOp update = new UpdateOp("" + id, false);
-        update.set(LEASE_END_KEY, leaseEndTime);
-        update.set(STATE, ClusterNodeState.ACTIVE.name());
+        update.set(LEASE_END_KEY, updatedLeaseEndTime);
+        update.set(STATE, ACTIVE.name());
 
         if (renewed && !leaseCheckDisabled) {
             // if leaseCheckDisabled, then we just update the lease without
@@ -818,7 +823,7 @@ public class ClusterNodeInfo {
             // make two assertions: the leaseEnd must match ..
             update.equals(LEASE_END_KEY, null, previousLeaseEndTime);
             // plus it must still be active ..
-            update.equals(STATE, null, ClusterNodeState.ACTIVE.name());
+            update.equals(STATE, null, ACTIVE.name());
             // plus it must not have a recovery lock on it
             update.notEquals(REV_RECOVERY_LOCK, RecoverLockState.ACQUIRED.name());
             // @TODO: to make it 100% failure proof we could introduce
@@ -830,56 +835,126 @@ public class ClusterNodeInfo {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Renewing lease for cluster id " + id + " with UpdateOp " + update);
         }
-        ClusterNodeInfoDocument doc = store.findAndUpdate(Collection.CLUSTER_NODES, update);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Lease renewal for cluster id " + id + " resulted in: " + doc);
-        }
- 
-        if (doc == null) { // should not occur when leaseCheckDisabled
-            // OAK-3398 : someone else either started recovering or is already through with that.
-            // in both cases the local instance lost the lease-update-game - and hence
-            // should behave and must consider itself as 'lease failed'
+        Stopwatch sw = Stopwatch.createStarted();
+        DocumentStoreException dse;
+        Object result = null;
+        try {
+            ClusterNodeInfoDocument doc = store.findAndUpdate(Collection.CLUSTER_NODES, update);
+            result = doc;
+
+            if (doc == null) { // should not occur when leaseCheckDisabled
+                // OAK-3398 : someone else either started recovering or is already through with that.
+                // in both cases the local instance lost the lease-update-game - and hence
+                // should behave and must consider itself as 'lease failed'
+
+                synchronized(this) {
+                    if (leaseCheckFailed) {
+                        // somehow the instance figured out otherwise that the
+                        // lease check failed - so we don't have to too - so we just log/throw
+                        throw leaseExpired(LEASE_CHECK_FAILED_MSG, true);
+                    }
+                    leaseCheckFailed = true; // make sure only one thread 'wins', ie goes any further
+                }
 
-            synchronized(this) {
-                if (leaseCheckFailed) {
-                    // somehow the instance figured out otherwise that the
-                    // lease check failed - so we don't have to too - so we just log/throw
-                    throw leaseExpired(LEASE_CHECK_FAILED_MSG, true);
+                String errorMsg = LEASE_CHECK_FAILED_MSG
+                        + " (Could not update lease anymore, someone else in the cluster "
+                        + "must have noticed this instance' slowness already. "
+                        + "Going to invoke leaseFailureHandler!)";
+
+                // try to add more diagnostics
+                try {
+                    ClusterNodeInfoDocument current = store.find(Collection.CLUSTER_NODES, "" + id);
+                    if (current != null) {
+                        Object leaseEnd = current.get(LEASE_END_KEY);
+                        Object recoveryLock = current.get(REV_RECOVERY_LOCK);
+                        Object recoveryBy = current.get(REV_RECOVERY_BY);
+                        Object cnState = current.get(STATE);
+                        errorMsg += " (leaseEnd: " + leaseEnd + " (expected: " + leaseEndTime + ")" +
+                                ", (state: " + cnState + " (expected: " + ACTIVE.name() + ")" +
+                                ", recoveryLock: " + recoveryLock +
+                                ", recoveryBy: " + recoveryBy + ")";
+                    }
+                } catch (DocumentStoreException ex) {
+                    LOG.error("trying to read ClusterNodeInfo for cluster id " + id, ex);
                 }
-                leaseCheckFailed = true; // make sure only one thread 'wins', ie goes any further
-            }
 
-            String errorMsg = LEASE_CHECK_FAILED_MSG + " (Could not update lease anymore, someone else in the cluster "
-                    + "must have noticed this instance' slowness already. " + "Going to invoke leaseFailureHandler!)";
+                LOG.error(errorMsg);
 
-            // try to add more diagnostics
+                handleLeaseFailure(errorMsg);
+                // should never be reached: handleLeaseFailure throws a DocumentStoreException
+                return false;
+            }
+            leaseEndTime = updatedLeaseEndTime;
+            previousLeaseEndTime = leaseEndTime; // store previousLeaseEndTime for reference for next time
+            String mode = (String) doc.get(READ_WRITE_MODE_KEY);
+            if (mode != null && !mode.equals(readWriteMode)) {
+                readWriteMode = mode;
+                store.setReadWriteMode(mode);
+            }
+            renewed = true;
+            return true;
+        } catch (DocumentStoreException e) {
+            dse = e;
+            result = e.toString();
+        } finally {
+            sw.stop();
+            String msg = "Lease renewal for cluster id {} took {}, resulted in: {}";
+            if (sw.elapsed(TimeUnit.SECONDS) > 10) {
+                LOG.warn(msg, id, sw, result);
+            } else if (sw.elapsed(TimeUnit.SECONDS) > 1) {
+                LOG.info(msg, id, sw, result);
+            } else if (LOG.isDebugEnabled()) {
+                LOG.debug(msg, id, sw, result);
+            }
+        }
+        // if we get here, the update failed with an exception, try to read the
+        // current cluster node info document and update leaseEndTime &
+        // previousLeaseEndTime accordingly until leaseEndTime is reached
+        while (getCurrentTime() < updatedLeaseEndTime) {
+            synchronized (this) {
+                if (leaseCheckFailed) {
+                    // no need to read from store, lease check already failed
+                    break;
+                }
+            }
+            long t1 = clock.getTime();
+            ClusterNodeInfoDocument doc;
             try {
-                ClusterNodeInfoDocument current = store.find(Collection.CLUSTER_NODES, "" + id);
-                if (current != null) {
-                    Object leaseEnd = current.get(LEASE_END_KEY);
-                    Object recoveryLock = current.get(REV_RECOVERY_LOCK);
-                    Object recoveryBy = current.get(REV_RECOVERY_BY);
-                    errorMsg += " (leaseEnd: " + leaseEnd + " (expected: " + leaseEndTime + "), recoveryLock: " + recoveryLock
-                            + ", recoveryBy: " + recoveryBy + ")";
+                doc = store.find(Collection.CLUSTER_NODES, String.valueOf(id));
+            } catch (DocumentStoreException e) {
+                LOG.warn("Reading ClusterNodeInfoDocument for id " + id + " failed", e);
+                // do not retry more than once a second
+                try {
+                    clock.waitUntil(t1 + 1000);
+                } catch (InterruptedException iex) {
+                    // ignore
                 }
-            } catch (DocumentStoreException ex) {
-                LOG.error("trying to read ClusterNodeInfo for cluster id " + id, ex);
+                continue;
+            }
+            if (doc != null) {
+                if (!doc.isActive()) {
+                    LOG.warn("ClusterNodeInfoDocument for id {} is not active " +
+                            "anymore. {}", id, doc);
+                    // break here and let the next lease update attempt fail
+                    break;
+                } else if (doc.getLeaseEndTime() == previousLeaseEndTime
+                        || doc.getLeaseEndTime() == updatedLeaseEndTime) {
+                    // set lease end times to current values
+                    previousLeaseEndTime = doc.getLeaseEndTime();
+                    leaseEndTime = doc.getLeaseEndTime();
+                    break;
+                } else {
+                    // leaseEndTime is neither the previous nor the new value
+                    // another cluster node must have updated the leaseEndTime
+                    // break here and let the next lease update attempt fail
+                    break;
+                }
+            } else {
+                LOG.warn("ClusterNodeInfoDocument for id {} does not exist anymore", id);
+                break;
             }
-
-            LOG.error(errorMsg);
-
-            handleLeaseFailure(errorMsg);
-            // should never be reached: handleLeaseFailure throws a DocumentStoreException
-            return false;
-        }
-        previousLeaseEndTime = leaseEndTime; // store previousLeaseEndTime for reference for next time
-        String mode = (String) doc.get(READ_WRITE_MODE_KEY);
-        if (mode != null && !mode.equals(readWriteMode)) {
-            readWriteMode = mode;
-            store.setReadWriteMode(mode);
         }
-        renewed = true;
-        return true;
+        throw dse;
     }
     
     /**

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java?rev=1760492&r1=1760491&r2=1760492&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java Tue Sep 13 08:21:57 2016
@@ -23,7 +23,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -82,7 +81,6 @@ public class ClusterNodeInfoTest {
     }
 
     // OAK-4770
-    @Ignore
     @Test
     public void renewLeaseExceptionAfter() throws Exception {
         ClusterNodeInfo info = newClusterNodeInfo(1);
@@ -105,6 +103,92 @@ public class ClusterNodeInfoTest {
     }
 
     @Test
+    public void renewLeaseExceptionBeforeWithDelay() throws Exception {
+        ClusterNodeInfo info = newClusterNodeInfo(1);
+        waitLeaseUpdateInterval();
+        store.setFailBeforeUpdate(1);
+        // delay operations by half the lease time, this will
+        // first delay the update and then delay the subsequent
+        // find because of the exception on update. afterwards the
+        // lease must be expired
+        store.setDelayMillis(info.getLeaseTime() / 2);
+        try {
+            info.renewLease();
+            fail("must throw DocumentStoreException");
+        } catch (DocumentStoreException e) {
+            // expected
+        }
+        assertTrue(info.getLeaseEndTime() < clock.getTime());
+    }
+
+    @Test
+    public void renewLeaseExceptionAfterWithDelay() throws Exception {
+        ClusterNodeInfo info = newClusterNodeInfo(1);
+        long leaseEnd = info.getLeaseEndTime();
+        waitLeaseUpdateInterval();
+        store.setFailAfterUpdate(1);
+        // delay operations by half the lease time, this will
+        // first delay the update and then delay the subsequent
+        // find because of the exception on update. afterwards
+        // the leaseEnd must reflect the updated value
+        store.setDelayMillis(info.getLeaseTime() / 2);
+        try {
+            info.renewLease();
+            fail("must throw DocumentStoreException");
+        } catch (DocumentStoreException e) {
+            // expected
+        }
+        assertTrue(info.getLeaseEndTime() > leaseEnd);
+    }
+
+    @Test
+    public void renewLeaseExceptionAfterFindFails() throws Exception {
+        ClusterNodeInfo info = newClusterNodeInfo(1);
+        long leaseEnd = info.getLeaseEndTime();
+        waitLeaseUpdateInterval();
+        store.setFailAfterUpdate(1);
+        store.setFailFind(1);
+        // delay operations by half the lease time, this will
+        // first delay the update and then delay and fail the
+        // subsequent find once.
+        store.setDelayMillis(info.getLeaseTime() / 2);
+        try {
+            info.renewLease();
+            fail("must throw DocumentStoreException");
+        } catch (DocumentStoreException e) {
+            // expected
+        }
+        assertEquals(0, store.getFailFind());
+        // must not reflect the updated value, because retries
+        // to read the current cluster node info document stops
+        // once lease expires
+        assertEquals(leaseEnd, info.getLeaseEndTime());
+    }
+
+    @Test
+    public void renewLeaseExceptionAfterFindSucceedsEventually() throws Exception {
+        ClusterNodeInfo info = newClusterNodeInfo(1);
+        waitLeaseUpdateInterval();
+        // delay operations by a sixth of the lease time, this will
+        // first delay the update and then delay and fail the
+        // subsequent find calls. find retries should eventually
+        // succeed within the lease time
+        store.setDelayMillis(info.getLeaseTime() / 6);
+        store.setFailAfterUpdate(1);
+        store.setFailFind(3);
+        try {
+            info.renewLease();
+            fail("must throw DocumentStoreException");
+        } catch (DocumentStoreException e) {
+            // expected
+        }
+        // the three retries must eventually succeed within the lease time
+        assertEquals(0, store.getFailFind());
+        // must reflect the updated value
+        assertTrue(info.getLeaseEndTime() > clock.getTime());
+    }
+
+    @Test
     public void renewLeaseDelayed() throws Exception {
         ClusterNodeInfo info = newClusterNodeInfo(1);
         clock.waitUntil(info.getLeaseEndTime() + ClusterNodeInfo.DEFAULT_LEASE_UPDATE_INTERVAL_MILLIS);
@@ -142,6 +226,26 @@ public class ClusterNodeInfoTest {
         }
     }
 
+    @Test
+    public void renewLeaseTimedOutWithCheck() throws Exception {
+        ClusterNodeInfo info = newClusterNodeInfo(1);
+        // wait until after lease end
+        clock.waitUntil(info.getLeaseEndTime() + ClusterNodeInfo.DEFAULT_LEASE_UPDATE_INTERVAL_MILLIS);
+        try {
+            info.performLeaseCheck();
+            fail("lease check must fail with exception");
+        } catch (DocumentStoreException e) {
+            // expected
+        }
+        // cluster node 1 must not be able to renew the lease now
+        try {
+            // must either return false
+            assertFalse(info.renewLease());
+        } catch (DocumentStoreException e) {
+            // or throw an exception
+        }
+    }
+
     private void recoverClusterNode(int clusterId) throws Exception {
         DocumentNodeStore ns = new DocumentMK.Builder()
                 .setDocumentStore(store.getStore()) // use unwrapped store
@@ -183,6 +287,7 @@ public class ClusterNodeInfoTest {
 
         private final AtomicInteger failBeforeUpdate = new AtomicInteger();
         private final AtomicInteger failAfterUpdate = new AtomicInteger();
+        private final AtomicInteger failFind = new AtomicInteger();
         private long delayMillis;
 
         TestStore() {
@@ -203,6 +308,14 @@ public class ClusterNodeInfoTest {
             return doc;
         }
 
+        @Override
+        public <T extends Document> T find(Collection<T> collection,
+                                           String key) {
+            maybeDelay();
+            maybeThrow(failFind, "find failed");
+            return super.find(collection, key);
+        }
+
         private void maybeDelay() {
             try {
                 clock.waitUntil(clock.getTime() + delayMillis);
@@ -241,5 +354,13 @@ public class ClusterNodeInfoTest {
         public void setDelayMillis(long delayMillis) {
             this.delayMillis = delayMillis;
         }
+
+        public int getFailFind() {
+            return failFind.get();
+        }
+
+        public void setFailFind(int num) {
+            this.failFind.set(num);
+        }
     }
 }