You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2016/09/13 08:21:57 UTC
svn commit: r1760492 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java
Author: mreutegg
Date: Tue Sep 13 08:21:57 2016
New Revision: 1760492
URL: http://svn.apache.org/viewvc?rev=1760492&view=rev
Log:
OAK-4770: Missing exception handling in ClusterNodeInfo.renewLease()
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1760492&r1=1760491&r2=1760492&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Tue Sep 13 08:21:57 2016
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.document;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.ACTIVE;
import static org.apache.jackrabbit.oak.plugins.document.Document.ID;
import java.lang.management.ManagementFactory;
@@ -29,6 +30,9 @@ import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
@@ -364,7 +368,7 @@ public class ClusterNodeInfo {
* @return the cluster node info
*/
public static ClusterNodeInfo getReadOnlyInstance(DocumentStore store) {
- return new ClusterNodeInfo(0, store, MACHINE_ID, WORKING_DIR, ClusterNodeState.ACTIVE,
+ return new ClusterNodeInfo(0, store, MACHINE_ID, WORKING_DIR, ACTIVE,
RecoverLockState.NONE, null, true) {
@Override
public void dispose() {
@@ -785,6 +789,7 @@ public class ClusterNodeInfo {
}
// lease requires renewal
+ long updatedLeaseEndTime;
synchronized(this) {
// this is synchronized since access to leaseCheckFailed and leaseEndTime
// are both normally synchronized to propagate values between renewLease()
@@ -799,12 +804,12 @@ public class ClusterNodeInfo {
// synchronized could have delayed the 'now', so
// set it again..
now = getCurrentTime();
- leaseEndTime = now + leaseTime;
+ updatedLeaseEndTime = now + leaseTime;
}
UpdateOp update = new UpdateOp("" + id, false);
- update.set(LEASE_END_KEY, leaseEndTime);
- update.set(STATE, ClusterNodeState.ACTIVE.name());
+ update.set(LEASE_END_KEY, updatedLeaseEndTime);
+ update.set(STATE, ACTIVE.name());
if (renewed && !leaseCheckDisabled) {
// if leaseCheckDisabled, then we just update the lease without
@@ -818,7 +823,7 @@ public class ClusterNodeInfo {
// make two assertions: the leaseEnd must match ..
update.equals(LEASE_END_KEY, null, previousLeaseEndTime);
// plus it must still be active ..
- update.equals(STATE, null, ClusterNodeState.ACTIVE.name());
+ update.equals(STATE, null, ACTIVE.name());
// plus it must not have a recovery lock on it
update.notEquals(REV_RECOVERY_LOCK, RecoverLockState.ACQUIRED.name());
// @TODO: to make it 100% failure proof we could introduce
@@ -830,56 +835,126 @@ public class ClusterNodeInfo {
if (LOG.isDebugEnabled()) {
LOG.debug("Renewing lease for cluster id " + id + " with UpdateOp " + update);
}
- ClusterNodeInfoDocument doc = store.findAndUpdate(Collection.CLUSTER_NODES, update);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Lease renewal for cluster id " + id + " resulted in: " + doc);
- }
-
- if (doc == null) { // should not occur when leaseCheckDisabled
- // OAK-3398 : someone else either started recovering or is already through with that.
- // in both cases the local instance lost the lease-update-game - and hence
- // should behave and must consider itself as 'lease failed'
+ Stopwatch sw = Stopwatch.createStarted();
+ DocumentStoreException dse;
+ Object result = null;
+ try {
+ ClusterNodeInfoDocument doc = store.findAndUpdate(Collection.CLUSTER_NODES, update);
+ result = doc;
+
+ if (doc == null) { // should not occur when leaseCheckDisabled
+ // OAK-3398 : someone else either started recovering or is already through with that.
+ // in both cases the local instance lost the lease-update-game - and hence
+ // should behave and must consider itself as 'lease failed'
+
+ synchronized(this) {
+ if (leaseCheckFailed) {
+ // somehow the instance figured out otherwise that the
+ // lease check failed - so we don't have to too - so we just log/throw
+ throw leaseExpired(LEASE_CHECK_FAILED_MSG, true);
+ }
+ leaseCheckFailed = true; // make sure only one thread 'wins', ie goes any further
+ }
- synchronized(this) {
- if (leaseCheckFailed) {
- // somehow the instance figured out otherwise that the
- // lease check failed - so we don't have to too - so we just log/throw
- throw leaseExpired(LEASE_CHECK_FAILED_MSG, true);
+ String errorMsg = LEASE_CHECK_FAILED_MSG
+ + " (Could not update lease anymore, someone else in the cluster "
+ + "must have noticed this instance' slowness already. "
+ + "Going to invoke leaseFailureHandler!)";
+
+ // try to add more diagnostics
+ try {
+ ClusterNodeInfoDocument current = store.find(Collection.CLUSTER_NODES, "" + id);
+ if (current != null) {
+ Object leaseEnd = current.get(LEASE_END_KEY);
+ Object recoveryLock = current.get(REV_RECOVERY_LOCK);
+ Object recoveryBy = current.get(REV_RECOVERY_BY);
+ Object cnState = current.get(STATE);
+ errorMsg += " (leaseEnd: " + leaseEnd + " (expected: " + leaseEndTime + ")" +
+ ", (state: " + cnState + " (expected: " + ACTIVE.name() + ")" +
+ ", recoveryLock: " + recoveryLock +
+ ", recoveryBy: " + recoveryBy + ")";
+ }
+ } catch (DocumentStoreException ex) {
+ LOG.error("trying to read ClusterNodeInfo for cluster id " + id, ex);
}
- leaseCheckFailed = true; // make sure only one thread 'wins', ie goes any further
- }
- String errorMsg = LEASE_CHECK_FAILED_MSG + " (Could not update lease anymore, someone else in the cluster "
- + "must have noticed this instance' slowness already. " + "Going to invoke leaseFailureHandler!)";
+ LOG.error(errorMsg);
- // try to add more diagnostics
+ handleLeaseFailure(errorMsg);
+ // should never be reached: handleLeaseFailure throws a DocumentStoreException
+ return false;
+ }
+ leaseEndTime = updatedLeaseEndTime;
+ previousLeaseEndTime = leaseEndTime; // store previousLeaseEndTime for reference for next time
+ String mode = (String) doc.get(READ_WRITE_MODE_KEY);
+ if (mode != null && !mode.equals(readWriteMode)) {
+ readWriteMode = mode;
+ store.setReadWriteMode(mode);
+ }
+ renewed = true;
+ return true;
+ } catch (DocumentStoreException e) {
+ dse = e;
+ result = e.toString();
+ } finally {
+ sw.stop();
+ String msg = "Lease renewal for cluster id {} took {}, resulted in: {}";
+ if (sw.elapsed(TimeUnit.SECONDS) > 10) {
+ LOG.warn(msg, id, sw, result);
+ } else if (sw.elapsed(TimeUnit.SECONDS) > 1) {
+ LOG.info(msg, id, sw, result);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(msg, id, sw, result);
+ }
+ }
+ // if we get here, the update failed with an exception, try to read the
+ // current cluster node info document and update leaseEndTime &
+ // previousLeaseEndTime accordingly until leaseEndTime is reached
+ while (getCurrentTime() < updatedLeaseEndTime) {
+ synchronized (this) {
+ if (leaseCheckFailed) {
+ // no need to read from store, lease check already failed
+ break;
+ }
+ }
+ long t1 = clock.getTime();
+ ClusterNodeInfoDocument doc;
try {
- ClusterNodeInfoDocument current = store.find(Collection.CLUSTER_NODES, "" + id);
- if (current != null) {
- Object leaseEnd = current.get(LEASE_END_KEY);
- Object recoveryLock = current.get(REV_RECOVERY_LOCK);
- Object recoveryBy = current.get(REV_RECOVERY_BY);
- errorMsg += " (leaseEnd: " + leaseEnd + " (expected: " + leaseEndTime + "), recoveryLock: " + recoveryLock
- + ", recoveryBy: " + recoveryBy + ")";
+ doc = store.find(Collection.CLUSTER_NODES, String.valueOf(id));
+ } catch (DocumentStoreException e) {
+ LOG.warn("Reading ClusterNodeInfoDocument for id " + id + " failed", e);
+ // do not retry more than once a second
+ try {
+ clock.waitUntil(t1 + 1000);
+ } catch (InterruptedException iex) {
+ // ignore
}
- } catch (DocumentStoreException ex) {
- LOG.error("trying to read ClusterNodeInfo for cluster id " + id, ex);
+ continue;
+ }
+ if (doc != null) {
+ if (!doc.isActive()) {
+ LOG.warn("ClusterNodeInfoDocument for id {} is not active " +
+ "anymore. {}", id, doc);
+ // break here and let the next lease update attempt fail
+ break;
+ } else if (doc.getLeaseEndTime() == previousLeaseEndTime
+ || doc.getLeaseEndTime() == updatedLeaseEndTime) {
+ // set lease end times to current values
+ previousLeaseEndTime = doc.getLeaseEndTime();
+ leaseEndTime = doc.getLeaseEndTime();
+ break;
+ } else {
+ // leaseEndTime is neither the previous nor the new value
+ // another cluster node must have updated the leaseEndTime
+ // break here and let the next lease update attempt fail
+ break;
+ }
+ } else {
+ LOG.warn("ClusterNodeInfoDocument for id {} does not exist anymore", id);
+ break;
}
-
- LOG.error(errorMsg);
-
- handleLeaseFailure(errorMsg);
- // should never be reached: handleLeaseFailure throws a DocumentStoreException
- return false;
- }
- previousLeaseEndTime = leaseEndTime; // store previousLeaseEndTime for reference for next time
- String mode = (String) doc.get(READ_WRITE_MODE_KEY);
- if (mode != null && !mode.equals(readWriteMode)) {
- readWriteMode = mode;
- store.setReadWriteMode(mode);
}
- renewed = true;
- return true;
+ throw dse;
}
/**
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java?rev=1760492&r1=1760491&r2=1760492&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java Tue Sep 13 08:21:57 2016
@@ -23,7 +23,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -82,7 +81,6 @@ public class ClusterNodeInfoTest {
}
// OAK-4770
- @Ignore
@Test
public void renewLeaseExceptionAfter() throws Exception {
ClusterNodeInfo info = newClusterNodeInfo(1);
@@ -105,6 +103,92 @@ public class ClusterNodeInfoTest {
}
@Test
+ public void renewLeaseExceptionBeforeWithDelay() throws Exception {
+ ClusterNodeInfo info = newClusterNodeInfo(1);
+ waitLeaseUpdateInterval();
+ store.setFailBeforeUpdate(1);
+ // delay operations by half the lease time, this will
+ // first delay the update and then delay the subsequent
+ // find because of the exception on update. afterwards the
+ // lease must be expired
+ store.setDelayMillis(info.getLeaseTime() / 2);
+ try {
+ info.renewLease();
+ fail("must throw DocumentStoreException");
+ } catch (DocumentStoreException e) {
+ // expected
+ }
+ assertTrue(info.getLeaseEndTime() < clock.getTime());
+ }
+
+ @Test
+ public void renewLeaseExceptionAfterWithDelay() throws Exception {
+ ClusterNodeInfo info = newClusterNodeInfo(1);
+ long leaseEnd = info.getLeaseEndTime();
+ waitLeaseUpdateInterval();
+ store.setFailAfterUpdate(1);
+ // delay operations by half the lease time, this will
+ // first delay the update and then delay the subsequent
+ // find because of the exception on update. afterwards
+ // the leaseEnd must reflect the updated value
+ store.setDelayMillis(info.getLeaseTime() / 2);
+ try {
+ info.renewLease();
+ fail("must throw DocumentStoreException");
+ } catch (DocumentStoreException e) {
+ // expected
+ }
+ assertTrue(info.getLeaseEndTime() > leaseEnd);
+ }
+
+ @Test
+ public void renewLeaseExceptionAfterFindFails() throws Exception {
+ ClusterNodeInfo info = newClusterNodeInfo(1);
+ long leaseEnd = info.getLeaseEndTime();
+ waitLeaseUpdateInterval();
+ store.setFailAfterUpdate(1);
+ store.setFailFind(1);
+ // delay operations by half the lease time, this will
+ // first delay the update and then delay and fail the
+ // subsequent find once.
+ store.setDelayMillis(info.getLeaseTime() / 2);
+ try {
+ info.renewLease();
+ fail("must throw DocumentStoreException");
+ } catch (DocumentStoreException e) {
+ // expected
+ }
+ assertEquals(0, store.getFailFind());
+ // must not reflect the updated value, because retries
+ // to read the current cluster node info document stops
+ // once lease expires
+ assertEquals(leaseEnd, info.getLeaseEndTime());
+ }
+
+ @Test
+ public void renewLeaseExceptionAfterFindSucceedsEventually() throws Exception {
+ ClusterNodeInfo info = newClusterNodeInfo(1);
+ waitLeaseUpdateInterval();
+ // delay operations by a sixth of the lease time, this will
+ // first delay the update and then delay and fail the
+ // subsequent find calls. find retries should eventually
+ // succeed within the lease time
+ store.setDelayMillis(info.getLeaseTime() / 6);
+ store.setFailAfterUpdate(1);
+ store.setFailFind(3);
+ try {
+ info.renewLease();
+ fail("must throw DocumentStoreException");
+ } catch (DocumentStoreException e) {
+ // expected
+ }
+ // the three retries must eventually succeed within the lease time
+ assertEquals(0, store.getFailFind());
+ // must reflect the updated value
+ assertTrue(info.getLeaseEndTime() > clock.getTime());
+ }
+
+ @Test
public void renewLeaseDelayed() throws Exception {
ClusterNodeInfo info = newClusterNodeInfo(1);
clock.waitUntil(info.getLeaseEndTime() + ClusterNodeInfo.DEFAULT_LEASE_UPDATE_INTERVAL_MILLIS);
@@ -142,6 +226,26 @@ public class ClusterNodeInfoTest {
}
}
+ @Test
+ public void renewLeaseTimedOutWithCheck() throws Exception {
+ ClusterNodeInfo info = newClusterNodeInfo(1);
+ // wait until after lease end
+ clock.waitUntil(info.getLeaseEndTime() + ClusterNodeInfo.DEFAULT_LEASE_UPDATE_INTERVAL_MILLIS);
+ try {
+ info.performLeaseCheck();
+ fail("lease check must fail with exception");
+ } catch (DocumentStoreException e) {
+ // expected
+ }
+ // cluster node 1 must not be able to renew the lease now
+ try {
+ // must either return false
+ assertFalse(info.renewLease());
+ } catch (DocumentStoreException e) {
+ // or throw an exception
+ }
+ }
+
private void recoverClusterNode(int clusterId) throws Exception {
DocumentNodeStore ns = new DocumentMK.Builder()
.setDocumentStore(store.getStore()) // use unwrapped store
@@ -183,6 +287,7 @@ public class ClusterNodeInfoTest {
private final AtomicInteger failBeforeUpdate = new AtomicInteger();
private final AtomicInteger failAfterUpdate = new AtomicInteger();
+ private final AtomicInteger failFind = new AtomicInteger();
private long delayMillis;
TestStore() {
@@ -203,6 +308,14 @@ public class ClusterNodeInfoTest {
return doc;
}
+ @Override
+ public <T extends Document> T find(Collection<T> collection,
+ String key) {
+ maybeDelay();
+ maybeThrow(failFind, "find failed");
+ return super.find(collection, key);
+ }
+
private void maybeDelay() {
try {
clock.waitUntil(clock.getTime() + delayMillis);
@@ -241,5 +354,13 @@ public class ClusterNodeInfoTest {
public void setDelayMillis(long delayMillis) {
this.delayMillis = delayMillis;
}
+
+ public int getFailFind() {
+ return failFind.get();
+ }
+
+ public void setFailFind(int num) {
+ this.failFind.set(num);
+ }
}
}