You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2018/07/03 15:25:39 UTC
svn commit: r1834986 [1/2] - in /jackrabbit/oak/trunk:
oak-run/src/main/java/org/apache/jackrabbit/oak/run/
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/...
Author: mreutegg
Date: Tue Jul 3 15:25:38 2018
New Revision: 1834986
URL: http://svn.apache.org/viewvc?rev=1834986&view=rev
Log:
OAK-7316: Greedy ClusterNodeInfo
Added:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLock.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparatorTest.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerTest.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLockTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BaseDocumentDiscoveryLiteServiceTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FormatVersionTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RandomDocumentNodeStoreSweepTest.java
Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java Tue Jul 3 15:25:38 2018
@@ -50,7 +50,7 @@ class RecoveryCommand implements Command
System.exit(1);
}
MongoDocumentStore docStore = (MongoDocumentStore) dns.getDocumentStore();
- LastRevRecoveryAgent agent = new LastRevRecoveryAgent(dns);
+ LastRevRecoveryAgent agent = new LastRevRecoveryAgent(docStore, dns);
MongoMissingLastRevSeeker seeker = new MongoMissingLastRevSeeker(
docStore, dns.getClock());
CloseableIterable<NodeDocument> docs = seeker.getCandidates(0);
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Tue Jul 3 15:25:38 2018
@@ -18,17 +18,23 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.ACTIVE;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.NONE;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState.ACQUIRED;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getModuleVersion;
import java.lang.management.ManagementFactory;
import java.net.NetworkInterface;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -161,7 +167,7 @@ public class ClusterNodeInfo {
/**
* The unique machine id (the MAC address if available).
*/
- private static final String MACHINE_ID = getMachineId();
+ private static final String MACHINE_ID = getHardwareMachineId();
/**
* The process id (if available).
@@ -296,7 +302,7 @@ public class ClusterNodeInfo {
/**
* The state of the cluster node.
*/
- private ClusterNodeState state;
+ private ClusterNodeState state = ACTIVE;
/**
* OAK-2739 / OAK-3397 : once a lease check turns out negative, this flag
@@ -314,17 +320,6 @@ public class ClusterNodeInfo {
private boolean leaseCheckDisabled;
/**
- * Tracks the fact whether the lease has *ever* been renewed by this instance
- * or has just be read from the document store at initialization time.
- */
- private boolean renewed;
-
- /**
- * The revLock value of the cluster;
- */
- private RecoverLockState revRecoveryLock;
-
- /**
* In memory flag indicating that this ClusterNode is entry is new and is being added to
* DocumentStore for the first time
*
@@ -339,21 +334,15 @@ public class ClusterNodeInfo {
*/
private LeaseFailureHandler leaseFailureHandler;
- private ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, ClusterNodeState state,
- RecoverLockState revRecoveryLock, Long leaseEnd, boolean newEntry) {
+ private ClusterNodeInfo(int id, DocumentStore store, String machineId,
+ String instanceId, boolean newEntry) {
this.id = id;
this.startTime = getCurrentTime();
- if (leaseEnd == null) {
- this.leaseEndTime = startTime;
- } else {
- this.leaseEndTime = leaseEnd;
- }
- this.renewed = false; // will be updated once we renew it the first time
+ this.leaseEndTime = this.startTime +leaseTime;
+ this.previousLeaseEndTime = this.leaseEndTime;
this.store = store;
this.machineId = machineId;
this.instanceId = instanceId;
- this.state = state;
- this.revRecoveryLock = revRecoveryLock;
this.newEntry = newEntry;
this.leaseCheckDisabled = DEFAULT_LEASE_CHECK_DISABLED;
}
@@ -366,20 +355,29 @@ public class ClusterNodeInfo {
return id;
}
+ String getMachineId() {
+ return machineId;
+ }
+
+ String getInstanceId() {
+ return instanceId;
+ }
+
/**
- * Create a dummy cluster node info instance to be utilized for read only access to underlying store.
- * @param store
+ * Create a cluster node info instance to be utilized for read only access
+ * to underlying store.
+ *
+ * @param store the document store.
* @return the cluster node info
*/
public static ClusterNodeInfo getReadOnlyInstance(DocumentStore store) {
- return new ClusterNodeInfo(0, store, MACHINE_ID, WORKING_DIR, ACTIVE,
- RecoverLockState.NONE, null, true) {
+ return new ClusterNodeInfo(0, store, MACHINE_ID, WORKING_DIR, true) {
@Override
public void dispose() {
}
@Override
- public long getLeaseTime() {
+ public long getLeaseEndTime() {
return Long.MAX_VALUE;
}
@@ -401,42 +399,21 @@ public class ClusterNodeInfo {
}
/**
- * Create a cluster node info instance for the store, with the
- *
- * @param store the document store (for the lease)
- * @param configuredClusterId the configured cluster id (or 0 for dynamic assignment)
- * @return the cluster node info
- */
- public static ClusterNodeInfo getInstance(DocumentStore store, int configuredClusterId) {
- return getInstance(store, MACHINE_ID, WORKING_DIR, configuredClusterId, false);
- }
-
- /**
- * Create a cluster node info instance for the store.
- *
- * @param store the document store (for the lease)
- * @param machineId the machine id (null for MAC address)
- * @param instanceId the instance id (null for current working directory)
- * @return the cluster node info
- */
- public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
- String instanceId) {
- return getInstance(store, machineId, instanceId, 0, true);
- }
-
- /**
- * Create a cluster node info instance for the store.
+ * Get or create a cluster node info instance for the store.
*
* @param store the document store (for the lease)
+ * @param recoveryHandler the recovery handler to call for a clusterId with
+ * an expired lease.
* @param machineId the machine id (null for MAC address)
* @param instanceId the instance id (null for current working directory)
* @param configuredClusterId the configured cluster id (or 0 for dynamic assignment)
- * @param updateLease whether to update the lease
* @return the cluster node info
*/
- public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
- String instanceId, int configuredClusterId, boolean updateLease) {
-
+ public static ClusterNodeInfo getInstance(DocumentStore store,
+ RecoveryHandler recoveryHandler,
+ String machineId,
+ String instanceId,
+ int configuredClusterId) {
// defaults for machineId and instanceID
if (machineId == null) {
machineId = MACHINE_ID;
@@ -447,54 +424,70 @@ public class ClusterNodeInfo {
int retries = 10;
for (int i = 0; i < retries; i++) {
- ClusterNodeInfo clusterNode = createInstance(store, machineId, instanceId, configuredClusterId, i == 0);
+ Map.Entry<ClusterNodeInfo, Long> suggestedClusterNode =
+ createInstance(store, recoveryHandler, machineId,
+ instanceId, configuredClusterId, i == 0);
+ ClusterNodeInfo clusterNode = suggestedClusterNode.getKey();
+ Long currentStartTime = suggestedClusterNode.getValue();
String key = String.valueOf(clusterNode.id);
- UpdateOp update = new UpdateOp(key, true);
+ UpdateOp update = new UpdateOp(key, clusterNode.newEntry);
update.set(MACHINE_ID_KEY, clusterNode.machineId);
update.set(INSTANCE_ID_KEY, clusterNode.instanceId);
- if (updateLease) {
- update.set(LEASE_END_KEY, getCurrentTime() + clusterNode.leaseTime);
- } else {
- update.set(LEASE_END_KEY, clusterNode.leaseEndTime);
- }
+ update.set(LEASE_END_KEY, clusterNode.leaseEndTime);
update.set(START_TIME_KEY, clusterNode.startTime);
update.set(INFO_KEY, clusterNode.toString());
- update.set(STATE, clusterNode.state.name());
- update.set(REV_RECOVERY_LOCK, clusterNode.revRecoveryLock.name());
+ update.set(STATE, ACTIVE.name());
update.set(OAK_VERSION_KEY, OAK_VERSION);
+ ClusterNodeInfoDocument before = null;
final boolean success;
if (clusterNode.newEntry) {
// For new entry do a create. This ensures that if two nodes
// create entry with same id then only one would succeed
success = store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
} else {
- // No expiration of earlier cluster info, so update
- store.createOrUpdate(Collection.CLUSTER_NODES, update);
- success = true;
+ // remember how the entry looked before the update
+ before = store.find(Collection.CLUSTER_NODES, key);
+
+ // perform a conditional update with a check on the startTime
+ // field. If there are competing cluster nodes trying to acquire
+ // the same inactive clusterId, only one of them will succeed.
+ update.equals(START_TIME_KEY, currentStartTime);
+ // ensure some other conditions
+ // 1) must not be active
+ update.notEquals(STATE, ACTIVE.name());
+ // 2) must not have a recovery lock
+ update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
+
+ success = store.findAndUpdate(Collection.CLUSTER_NODES, update) != null;
}
if (success) {
+ logClusterIdAcquired(clusterNode, before);
return clusterNode;
}
+ LOG.info("Collision while acquiring clusterId {}. Retrying...",
+ clusterNode.getId());
}
- throw new DocumentStoreException("Could not get cluster node info (retried " + retries + " times)");
+ throw new DocumentStoreException("Could not get cluster node info (tried " + retries + " times)");
}
- private static ClusterNodeInfo createInstance(DocumentStore store, String machineId,
- String instanceId, int configuredClusterId, boolean waitForLease) {
+ private static Map.Entry<ClusterNodeInfo, Long> createInstance(DocumentStore store,
+ RecoveryHandler recoveryHandler,
+ String machineId,
+ String instanceId,
+ int configuredClusterId,
+ boolean waitForLease) {
long now = getCurrentTime();
- int clusterNodeId = 0;
int maxId = 0;
- ClusterNodeState state = ClusterNodeState.NONE;
- RecoverLockState lockState = RecoverLockState.NONE;
- Long prevLeaseEnd = null;
- boolean newEntry = false;
ClusterNodeInfoDocument alreadyExistingConfigured = null;
String reuseFailureReason = "";
List<ClusterNodeInfoDocument> list = ClusterNodeInfoDocument.all(store);
+ Map<Integer, Long> startTimes = new HashMap<>();
+ SortedSet<ClusterNodeInfo> candidates = new TreeSet<>(
+ new ClusterNodeInfoComparator(machineId, instanceId));
for (ClusterNodeInfoDocument doc : list) {
@@ -510,6 +503,13 @@ public class ClusterNodeInfo {
maxId = Math.max(maxId, id);
+ // cannot use an entry without start time
+ if (doc.getStartTime() == -1) {
+ reuseFailureReason = reject(id,
+ "Cluster node entry does not have a startTime. ");
+ continue;
+ }
+
// if a cluster id was configured: check that and abort if it does
// not match
if (configuredClusterId != 0) {
@@ -524,7 +524,11 @@ public class ClusterNodeInfo {
String mId = "" + doc.get(MACHINE_ID_KEY);
String iId = "" + doc.get(INSTANCE_ID_KEY);
- if (leaseEnd != null && leaseEnd > now) {
+ // handle active clusterId with valid lease and no recovery lock
+ // -> potentially wait for lease if machine and instance id match
+ if (leaseEnd != null
+ && leaseEnd > now
+ && !doc.isRecoveryNeeded(now)) {
// wait if (a) instructed to, and (b) also the remaining time
// time is not much bigger than the lease interval (in which
// case something is very very wrong anyway)
@@ -532,46 +536,48 @@ public class ClusterNodeInfo {
&& iId.equals(instanceId)) {
boolean worthRetrying = waitForLeaseExpiry(store, doc, leaseEnd, machineId, instanceId);
if (worthRetrying) {
- return createInstance(store, machineId, instanceId, configuredClusterId, false);
+ return createInstance(store, recoveryHandler, machineId, instanceId, configuredClusterId, false);
}
}
- reuseFailureReason = "leaseEnd " + leaseEnd + " > " + now + " - " + (leaseEnd - now) + "ms in the future";
+ reuseFailureReason = reject(id,
+ "leaseEnd " + leaseEnd + " > " + now + " - " + (leaseEnd - now) + "ms in the future");
continue;
}
- // remove entries with "random:" keys if not in use (no lease at all)
- if (mId.startsWith(RANDOM_PREFIX) && leaseEnd == null) {
- store.remove(Collection.CLUSTER_NODES, key);
- LOG.debug("Cleaned up cluster node info for clusterNodeId {} [machineId: {}, leaseEnd: n/a]", id, mId);
- if (alreadyExistingConfigured == doc) {
- // we removed it, so we can't re-use it after all
- alreadyExistingConfigured = null;
+ // if we get here the clusterId either:
+ // 1) is inactive
+ // 2) needs recovery
+ if (doc.isRecoveryNeeded(now)) {
+ if (mId.equals(machineId) && iId.equals(instanceId)) {
+ // this id matches our environment and has an expired lease
+ // use it after a successful recovery
+ if (!recoveryHandler.recover(id)) {
+ reuseFailureReason = reject(id,
+ "needs recovery and was unable to perform it myself");
+ continue;
+ }
+ } else {
+ // a different machine or instance
+ reuseFailureReason = reject(id,
+ "needs recovery and machineId/instanceId do not match: " +
+ mId + "/" + iId + " != " + machineId + "/" + instanceId);
+ continue;
}
- continue;
}
- if (!mId.equals(machineId) || !iId.equals(instanceId)) {
- // a different machine or instance
- reuseFailureReason = "machineId/instanceId do not match: " + mId + "/" + iId + " != " + machineId + "/" + instanceId;
- continue;
- }
+ // if we get here the cluster node entry is inactive. if recovery
+ // was needed, then it was successful
- // a cluster node which matches current machine identity but
- // not being used
- if (clusterNodeId == 0 || id < clusterNodeId) {
- // if there are multiple, use the smallest value
- clusterNodeId = id;
- state = ClusterNodeState.fromString((String) doc.get(STATE));
- prevLeaseEnd = leaseEnd;
- lockState = RecoverLockState.fromString((String) doc.get(REV_RECOVERY_LOCK));
- }
+ // create a candidate. those with matching machine and instance id
+ // are preferred, then the one with the lowest clusterId.
+ candidates.add(new ClusterNodeInfo(id, store, mId, iId, false));
+ startTimes.put(id, doc.getStartTime());
}
- // No usable existing entry with matching signature found so
- // create a new entry
- if (clusterNodeId == 0) {
- newEntry = true;
+ if (candidates.isEmpty()) {
+ // No usable existing entry found
+ int clusterNodeId;
if (configuredClusterId != 0) {
if (alreadyExistingConfigured != null) {
throw new DocumentStoreException(
@@ -581,12 +587,41 @@ public class ClusterNodeInfo {
} else {
clusterNodeId = maxId + 1;
}
+ // No usable existing entry found so create a new entry
+ candidates.add(new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, true));
}
- // Do not expire entries and stick on the earlier state, and leaseEnd so,
- // that _lastRev recovery if needed is done.
- return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state,
- lockState, prevLeaseEnd, newEntry);
+ // use the best candidate
+ ClusterNodeInfo info = candidates.first();
+ // and replace with an info matching the current machine and instance id
+ info = new ClusterNodeInfo(info.id, store, machineId, instanceId, info.newEntry);
+ return new AbstractMap.SimpleImmutableEntry<>(info, startTimes.get(info.getId()));
+ }
+
+ private static void logClusterIdAcquired(ClusterNodeInfo clusterNode,
+ ClusterNodeInfoDocument before) {
+ String type = clusterNode.newEntry ? "new" : "existing";
+ String machineInfo = clusterNode.machineId;
+ String instanceInfo = clusterNode.instanceId;
+ if (before != null) {
+ // machineId or instanceId may have changed
+ String beforeMachineId = String.valueOf(before.get(MACHINE_ID_KEY));
+ String beforeInstanceId = String.valueOf(before.get(INSTANCE_ID_KEY));
+ if (!clusterNode.machineId.equals(beforeMachineId)) {
+ machineInfo = "(changed) " + beforeMachineId + " -> " + machineInfo;
+ }
+ if (!clusterNode.instanceId.equals(beforeInstanceId)) {
+ instanceInfo = "(changed) " + beforeInstanceId + " -> " + instanceInfo;
+ }
+ }
+ LOG.info("Acquired ({}) clusterId {}. MachineId {}, InstanceId {}",
+ type, clusterNode.getId(), machineInfo, instanceInfo);
+
+ }
+
+ private static String reject(int id, String reason) {
+ LOG.debug("Cannot acquire {}: {}", id, reason);
+ return reason;
}
private static boolean waitForLeaseExpiry(DocumentStore store, ClusterNodeInfoDocument cdoc, long leaseEnd, String machineId,
@@ -645,7 +680,7 @@ public class ClusterNodeInfo {
* @throws DocumentStoreException if the lease expired.
*/
public void performLeaseCheck() throws DocumentStoreException {
- if (leaseCheckDisabled || !renewed) {
+ if (leaseCheckDisabled) {
// if leaseCheckDisabled is set we never do the check, so return fast
// the 'renewed' flag indicates if this instance *ever* renewed the lease after startup
@@ -818,9 +853,8 @@ public class ClusterNodeInfo {
UpdateOp update = new UpdateOp("" + id, false);
update.set(LEASE_END_KEY, updatedLeaseEndTime);
- update.set(STATE, ACTIVE.name());
- if (renewed && !leaseCheckDisabled) {
+ if (!leaseCheckDisabled) {
// if leaseCheckDisabled, then we just update the lease without
// checking
// OAK-3398:
@@ -829,12 +863,12 @@ public class ClusterNodeInfo {
// then we can now make an assertion that the lease is unchanged
// and the incremental update must only succeed if no-one else
// did a recover/inactivation in the meantime
- // make two assertions: the leaseEnd must match ..
+ // make three assertions: the leaseEnd must match ..
update.equals(LEASE_END_KEY, null, previousLeaseEndTime);
// plus it must still be active ..
update.equals(STATE, null, ACTIVE.name());
// plus it must not have a recovery lock on it
- update.notEquals(REV_RECOVERY_LOCK, RecoverLockState.ACQUIRED.name());
+ update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
// @TODO: to make it 100% failure proof we could introduce
// yet another field to clusterNodes: a runtimeId that we
// create (UUID) at startup each time - and against that
@@ -900,7 +934,6 @@ public class ClusterNodeInfo {
readWriteMode = mode;
store.setReadWriteMode(mode);
}
- renewed = true;
return true;
} catch (DocumentStoreException e) {
dse = e;
@@ -1014,8 +1047,8 @@ public class ClusterNodeInfo {
UpdateOp update = new UpdateOp("" + id, true);
update.set(LEASE_END_KEY, null);
update.set(STATE, null);
- update.set(REV_RECOVERY_LOCK, RecoverLockState.NONE.name());
store.createOrUpdate(Collection.CLUSTER_NODES, update);
+ state = NONE;
}
@Override
@@ -1028,7 +1061,6 @@ public class ClusterNodeInfo {
"uuid: " + uuid + ",\n" +
"readWriteMode: " + readWriteMode + ",\n" +
"state: " + state + ",\n" +
- "revLock: " + revRecoveryLock + ",\n" +
"oakVersion: " + OAK_VERSION + ",\n" +
"formatVersion: " + DocumentNodeStore.VERSION;
}
@@ -1080,7 +1112,7 @@ public class ClusterNodeInfo {
*
* @return the unique id
*/
- private static String getMachineId() {
+ private static String getHardwareMachineId() {
Exception exception = null;
try {
ArrayList<String> macAddresses = new ArrayList<String>();
Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java?rev=1834986&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java Tue Jul 3 15:25:38 2018
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Comparator;
+
+/**
+ * Comparator for {@link ClusterNodeInfo} instances based on a given environment
+ * defined by a {@code machineId} and {@code instanceId}. The comparator orders
+ * cluster node info with a matching environment before others that do not match
+ * and then compares the clusterId, lower values first.
+ */
+class ClusterNodeInfoComparator implements Comparator<ClusterNodeInfo> {
+
+ private static final Comparator<Boolean> BOOLEAN_REVERSED = Comparator.comparing(Boolean::booleanValue).reversed();
+
+ private final String machineId;
+ private final String instanceId;
+
+ ClusterNodeInfoComparator(String machineId,
+ String instanceId) {
+ this.machineId = machineId;
+ this.instanceId = instanceId;
+ }
+
+ @Override
+ public int compare(ClusterNodeInfo info1,
+ ClusterNodeInfo info2) {
+ // first compare whether the environment matches
+ return Comparator.comparing(this::matchesEnvironment, BOOLEAN_REVERSED)
+ // then compare the clusterIds
+ .thenComparingInt(ClusterNodeInfo::getId).compare(info1, info2);
+ }
+
+ private boolean matchesEnvironment(ClusterNodeInfo info) {
+ return machineId.equals(info.getMachineId())
+ && instanceId.equals(info.getInstanceId());
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoComparator.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java Tue Jul 3 15:25:38 2018
@@ -82,6 +82,23 @@ public class ClusterNodeInfoDocument ext
}
/**
+ * Check if _lastRev recovery is needed for cluster node info document.
+ * Returns {@code true} if both of the below conditions are {@code true}:
+ * <ul>
+ * <li>State is Active</li>
+ * <li>Current time is past the leaseEnd time or there is a recovery
+ * lock on the cluster node info document</li>
+ * </ul>
+ * @param currentTimeMillis the current time in milliseconds since the start
+ * start of the epoch.
+ */
+ public boolean isRecoveryNeeded(long currentTimeMillis) {
+ return isActive() &&
+ (currentTimeMillis > getLeaseEndTime() ||
+ isBeingRecovered());
+ }
+
+ /**
* Returns {@code true} if the cluster node represented by this document
* is currently being recovered by the given {@code clusterId}.
*
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Jul 3 15:25:38 2018
@@ -29,7 +29,6 @@ import static java.util.concurrent.TimeU
import static org.apache.jackrabbit.oak.api.CommitFailedException.OAK;
import static org.apache.jackrabbit.oak.commons.PathUtils.ROOT_PATH;
import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
-import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder.MANY_CHILDREN_THRESHOLD;
@@ -177,14 +176,6 @@ public final class DocumentNodeStore
private boolean fairBackgroundOperationLock =
Boolean.parseBoolean(System.getProperty("oak.fairBackgroundOperationLock", "true"));
- /**
- * The timeout in milliseconds to wait for the recovery performed by
- * another cluster node.
- */
- private long recoveryWaitTimeoutMS =
- Long.getLong("oak.recoveryWaitTimeoutMS", 60000);
-
-
public static final String SYS_PROP_DISABLE_JOURNAL = "oak.disableJournalDiff";
/**
* Feature flag to disable the journal diff mechanism. See OAK-4528.
@@ -537,22 +528,24 @@ public final class DocumentNodeStore
readOnlyMode = false;
}
checkVersion(s, readOnlyMode);
+ this.nonLeaseCheckingStore = s;
this.executor = builder.getExecutor();
+ this.lastRevSeeker = builder.createMissingLastRevSeeker();
this.clock = builder.getClock();
int cid = builder.getClusterId();
cid = Integer.getInteger("oak.documentMK.clusterId", cid);
if (readOnlyMode) {
- clusterNodeInfo = ClusterNodeInfo.getReadOnlyInstance(s);
+ clusterNodeInfo = ClusterNodeInfo.getReadOnlyInstance(nonLeaseCheckingStore);
} else {
- clusterNodeInfo = ClusterNodeInfo.getInstance(s, cid);
+ clusterNodeInfo = ClusterNodeInfo.getInstance(nonLeaseCheckingStore,
+ new RecoveryHandlerImpl(nonLeaseCheckingStore, clock, lastRevSeeker),
+ null, null, cid);
}
// TODO we should ensure revisions generated from now on
// are never "older" than revisions already in the repository for
// this cluster id
- cid = clusterNodeInfo.getId();
-
- this.nonLeaseCheckingStore = s;
+ this.clusterId = clusterNodeInfo.getId();
if (builder.getLeaseCheck()) {
s = new LeaseCheckDocumentStoreWrapper(s, clusterNodeInfo);
@@ -560,11 +553,21 @@ public final class DocumentNodeStore
} else {
clusterNodeInfo.setLeaseCheckDisabled(true);
}
+ String threadNamePostfix = "(" + clusterId + ")";
+ leaseUpdateThread = new Thread(new BackgroundLeaseUpdate(this, isDisposed),
+ "DocumentNodeStore lease update thread " + threadNamePostfix);
+ leaseUpdateThread.setDaemon(true);
+ if (!readOnlyMode) {
+ // OAK-3398 : make lease updating more robust by ensuring it
+ // has higher likelihood of succeeding than other threads
+ // on a very busy machine - so as to prevent lease timeout.
+ leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
+ leaseUpdateThread.start();
+ }
this.journalPropertyHandlerFactory = builder.getJournalPropertyHandlerFactory();
this.store = s;
this.changes = newJournalEntry();
- this.clusterId = cid;
this.branches = new UnmergedBranches();
this.asyncDelay = builder.getAsyncDelay();
this.versionGarbageCollector = new VersionGarbageCollector(
@@ -575,8 +578,8 @@ public final class DocumentNodeStore
this, builder.getJournalGCMaxAge());
this.referencedBlobs =
builder.createReferencedBlobs(this);
- this.lastRevSeeker = builder.createMissingLastRevSeeker();
- this.lastRevRecoveryAgent = new LastRevRecoveryAgent(this, lastRevSeeker);
+ this.lastRevRecoveryAgent = new LastRevRecoveryAgent(store, this,
+ lastRevSeeker, clusterId -> this.signalClusterStateChange());
this.disableBranches = builder.isDisableBranches();
this.missing = new DocumentNodeState(this, "MISSING",
new RevisionVector(new Revision(0, 0, 0))) {
@@ -597,7 +600,6 @@ public final class DocumentNodeStore
builder.getWeigher(), builder.getChildrenCacheSize());
diffCache = builder.getDiffCache(this.clusterId);
- checkpoints = new Checkpoints(this);
// check if root node exists
NodeDocument rootDoc = store.find(NODES, Utils.getIdFromPath("/"));
@@ -625,9 +627,6 @@ public final class DocumentNodeStore
}
} else {
sweepRevisions = sweepRevisions.pmax(rootDoc.getSweepRevisions());
- if (!readOnlyMode) {
- checkLastRevRecovery();
- }
initializeRootState(rootDoc);
// check if _lastRev for our clusterId exists
if (!rootDoc.getLastRev().containsKey(clusterId)) {
@@ -647,9 +646,7 @@ public final class DocumentNodeStore
}
}
- // Renew the lease because it may have been stale
- renewClusterIdLease();
-
+ checkpoints = new Checkpoints(this);
// initialize branchCommits
branches.init(store, this);
@@ -657,7 +654,6 @@ public final class DocumentNodeStore
new PrefetchDispatcher(getRoot(), executor) :
new ChangeDispatcher(getRoot());
commitQueue = new CommitQueue(this);
- String threadNamePostfix = "(" + clusterId + ")";
batchCommitQueue = new BatchCommitQueue(store);
// prepare background threads
backgroundReadThread = new Thread(
@@ -675,19 +671,10 @@ public final class DocumentNodeStore
clusterUpdateThread = new Thread(new BackgroundClusterUpdate(this, isDisposed),
"DocumentNodeStore cluster update thread " + threadNamePostfix);
clusterUpdateThread.setDaemon(true);
- leaseUpdateThread = new Thread(new BackgroundLeaseUpdate(this, isDisposed),
- "DocumentNodeStore lease update thread " + threadNamePostfix);
- leaseUpdateThread.setDaemon(true);
// now start the background threads
clusterUpdateThread.start();
backgroundReadThread.start();
if (!readOnlyMode) {
- // OAK-3398 : make lease updating more robust by ensuring it
- // has higher likelihood of succeeding than other threads
- // on a very busy machine - so as to prevent lease timeout.
- leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
- leaseUpdateThread.start();
-
// perform an initial document sweep if needed
// this may be long running if there is no sweep revision
// for this clusterId (upgrade from Oak <= 1.6).
@@ -716,31 +703,6 @@ public final class DocumentNodeStore
}
}
-
- /**
- * Recover _lastRev recovery if needed.
- *
- * @throws DocumentStoreException if recovery did not finish within
- * {@link #recoveryWaitTimeoutMS}.
- */
- private void checkLastRevRecovery() throws DocumentStoreException {
- long timeout = clock.getTime() + recoveryWaitTimeoutMS;
- int numRecovered = lastRevRecoveryAgent.recover(clusterId, timeout);
- if (numRecovered == -1) {
- ClusterNodeInfoDocument doc = store.find(CLUSTER_NODES, String.valueOf(clusterId));
- String otherId = "n/a";
- if (doc != null) {
- otherId = String.valueOf(doc.get(ClusterNodeInfo.REV_RECOVERY_BY));
- }
- String msg = "This cluster node (" + clusterId + ") requires " +
- "_lastRev recovery which is currently performed by " +
- "another cluster node (" + otherId + "). Recovery is " +
- "still ongoing after " + recoveryWaitTimeoutMS + " ms. " +
- "Failing startup of this DocumentNodeStore now!";
- throw new DocumentStoreException(msg);
- }
- }
-
public void dispose() {
LOG.info("Starting disposal of DocumentNodeStore with clusterNodeId: {} ({})", clusterId,
getClusterNodeInfoDisplayString());
@@ -2985,7 +2947,7 @@ public final class DocumentNodeStore
this.clusterStateChangeListener = clusterStateChangeListener;
}
- void signalClusterStateChange() {
+ private void signalClusterStateChange() {
if (clusterStateChangeListener != null) {
clusterStateChangeListener.handleClusterStateChange();
}
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Tue Jul 3 15:25:38 2018
@@ -33,10 +33,10 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import javax.annotation.CheckForNull;
-import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
@@ -65,19 +65,28 @@ import org.slf4j.LoggerFactory;
public class LastRevRecoveryAgent {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final DocumentNodeStore nodeStore;
+ private final DocumentStore store;
+
+ private final RevisionContext revisionContext;
private final MissingLastRevSeeker missingLastRevUtil;
- public LastRevRecoveryAgent(DocumentNodeStore nodeStore,
- MissingLastRevSeeker seeker) {
- this.nodeStore = nodeStore;
+ private final Consumer<Integer> afterRecovery;
+
+ public LastRevRecoveryAgent(DocumentStore store,
+ RevisionContext revisionContext,
+ MissingLastRevSeeker seeker,
+ Consumer<Integer> afterRecovery) {
+ this.store = store;
+ this.revisionContext = revisionContext;
this.missingLastRevUtil = seeker;
+ this.afterRecovery = afterRecovery;
}
- public LastRevRecoveryAgent(DocumentNodeStore nodeStore) {
- this(nodeStore, new MissingLastRevSeeker(
- nodeStore.getDocumentStore(), nodeStore.getClock()));
+ public LastRevRecoveryAgent(DocumentStore store, RevisionContext context) {
+ this(store, context,
+ new MissingLastRevSeeker(store, context.getClock()),
+ i -> {});
}
/**
@@ -85,7 +94,15 @@ public class LastRevRecoveryAgent {
* nodes. If another cluster node is already performing the recovery for the
* given {@code clusterId}, this method will {@code waitUntil} the given
* time in milliseconds for the recovery to finish.
- *
+ * <p>
+ * If recovery is performed for the clusterId as exposed by the revision
+ * context passed to the constructor of this recovery agent, then this
+ * method will put a deadline on how long recovery may take. The deadline
+ * is the current lease end as read from the {@code clusterNodes} collection
+ * entry for the {@code clusterId} to recover minus the
+ * {@link ClusterNodeInfo#DEFAULT_LEASE_FAILURE_MARGIN_MILLIS}. This method
+ * will throw a {@link DocumentStoreException} if the deadline is reached.
+ * <p>
* This method will return:
* <ul>
* <li>{@code -1} when another cluster node is busy performing recovery
@@ -104,22 +121,17 @@ public class LastRevRecoveryAgent {
* already performing the recovery.
* @return the number of restored nodes or {@code -1} if a timeout occurred
* while waiting for an ongoing recovery by another cluster node.
+ * @throws DocumentStoreException if the deadline is reached or some other
+ * error occurs while reading from the underlying document store.
*/
public int recover(int clusterId, long waitUntil)
throws DocumentStoreException {
ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
- //TODO Currently leaseTime remains same per cluster node. If this
- //is made configurable then it should be read from DB entry
- final long leaseTime = ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS;
- final long asyncDelay = nodeStore.getAsyncDelay();
-
if (nodeInfo != null) {
// Check if _lastRev recovery needed for this cluster node
// state is Active && current time past leaseEnd
- if (missingLastRevUtil.isRecoveryNeeded(nodeInfo)) {
- long leaseEnd = nodeInfo.getLeaseEndTime();
-
+ if (nodeInfo.isRecoveryNeeded(revisionContext.getClock().getTime())) {
// retrieve the root document's _lastRev
NodeDocument root = missingLastRevUtil.getRoot();
Revision lastRev = root.getLastRev().get(clusterId);
@@ -134,10 +146,9 @@ public class LastRevRecoveryAgent {
startTime = lastRev.getTimestamp();
reason = "lastRev: " + lastRev.toString();
} else {
- startTime = leaseEnd - leaseTime - asyncDelay;
+ startTime = nodeInfo.getStartTime();
reason = String.format(
- "no lastRev for root, using timestamp based on leaseEnd %d - leaseTime %d - asyncDelay %d", leaseEnd,
- leaseTime, asyncDelay);
+ "no lastRev for root, using startTime %d", startTime);
}
if (sweepRev != null && sweepRev.getTimestamp() < startTime) {
startTime = sweepRev.getTimestamp();
@@ -159,24 +170,37 @@ public class LastRevRecoveryAgent {
* @param clusterId the cluster id for which the _lastRev are to be recovered
* @return the number of restored nodes or {@code -1} if there is an ongoing
* recovery by another cluster node.
+ * @throws DocumentStoreException if the deadline is reached or some other
+ * error occurs while reading from the underlying document store.
*/
- public int recover(int clusterId) {
+ public int recover(int clusterId) throws DocumentStoreException {
return recover(clusterId, 0);
}
/**
- * Recover the correct _lastRev updates for the given candidate nodes.
+ * Same as {@link #recover(Iterable, int, boolean)} with {@code dryRun} set
+ * to {@code false}.
*
* @param suspects the potential suspects
* @param clusterId the cluster id for which _lastRev recovery needed
* @return the number of documents that required recovery.
+ * @throws DocumentStoreException if the deadline is reached or some other
+ * error occurs while reading from the underlying document store.
*/
- public int recover(Iterable<NodeDocument> suspects, int clusterId) {
+ public int recover(Iterable<NodeDocument> suspects, int clusterId)
+ throws DocumentStoreException {
return recover(suspects, clusterId, false);
}
/**
- * Recover the correct _lastRev updates for the given candidate nodes.
+ * Recover the correct _lastRev updates for the given candidate nodes. If
+ * recovery is performed for the clusterId as exposed by the revision
+ * context passed to the constructor of this recovery agent, then this
+ * method will put a deadline on how long recovery may take. The deadline
+ * is the current lease end as read from the {@code clusterNodes} collection
+ * entry for the {@code clusterId} to recover minus the
+ * {@link ClusterNodeInfo#DEFAULT_LEASE_FAILURE_MARGIN_MILLIS}. This method
+ * will throw a {@link DocumentStoreException} if the deadline is reached.
*
* @param suspects the potential suspects
* @param clusterId the cluster id for which _lastRev recovery needed
@@ -185,12 +209,24 @@ public class LastRevRecoveryAgent {
* @return the number of documents that required recovery. This method
* returns the number of the affected documents even if
* {@code dryRun} is set true and no document was changed.
+ * @throws DocumentStoreException if the deadline is reached or some other
+ * error occurs while reading from the underlying document store.
*/
public int recover(final Iterable<NodeDocument> suspects,
final int clusterId, final boolean dryRun)
throws DocumentStoreException {
- final DocumentStore docStore = nodeStore.getDocumentStore();
- NodeDocument rootDoc = Utils.getRootDocument(docStore);
+ // set a deadline if this is a self recovery. Self recovery does not
+ // update the lease in a background thread and must terminate before
+ // the lease acquired by the recovery lock expires.
+ long deadline = Long.MAX_VALUE;
+ if (clusterId == revisionContext.getClusterId()) {
+ ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
+ if (nodeInfo != null && nodeInfo.isActive()) {
+ deadline = nodeInfo.getLeaseEndTime() - ClusterNodeInfo.DEFAULT_LEASE_FAILURE_MARGIN_MILLIS;
+ }
+ }
+
+ NodeDocument rootDoc = Utils.getRootDocument(store);
// first run a sweep
final AtomicReference<Revision> sweepRev = new AtomicReference<>();
@@ -199,8 +235,8 @@ public class LastRevRecoveryAgent {
// sweep revision. Initial sweep is not the responsibility
// of the recovery agent.
final RevisionContext context = new RecoveryContext(rootDoc,
- nodeStore.getClock(), clusterId,
- nodeStore::getCommitValue);
+ revisionContext.getClock(), clusterId,
+ revisionContext::getCommitValue);
final NodeDocumentSweeper sweeper = new NodeDocumentSweeper(context, true);
sweeper.sweep(suspects, new NodeDocumentSweepListener() {
@Override
@@ -213,16 +249,16 @@ public class LastRevRecoveryAgent {
return;
}
// create an invalidate entry
- JournalEntry inv = JOURNAL.newDocument(docStore);
+ JournalEntry inv = JOURNAL.newDocument(store);
inv.modified(updates.keySet());
Revision r = context.newRevision().asBranchRevision();
UpdateOp invOp = inv.asUpdateOp(r);
// and reference it from a regular entry
- JournalEntry entry = JOURNAL.newDocument(docStore);
+ JournalEntry entry = JOURNAL.newDocument(store);
entry.invalidate(Collections.singleton(r));
Revision jRev = context.newRevision();
UpdateOp jOp = entry.asUpdateOp(jRev);
- if (!docStore.create(JOURNAL, newArrayList(invOp, jOp))) {
+ if (!store.create(JOURNAL, newArrayList(invOp, jOp))) {
String msg = "Unable to create journal entries for " +
"document invalidation.";
throw new DocumentStoreException(msg);
@@ -230,7 +266,7 @@ public class LastRevRecoveryAgent {
sweepRev.set(Utils.max(sweepRev.get(), jRev));
// now that journal entry is in place, perform the actual
// updates on the documents
- docStore.createOrUpdate(NODES, newArrayList(updates.values()));
+ store.createOrUpdate(NODES, newArrayList(updates.values()));
log.info("Sweeper updated {}", updates.keySet());
}
});
@@ -242,7 +278,7 @@ public class LastRevRecoveryAgent {
//Map of known last rev of checked paths
Map<String, Revision> knownLastRevOrModification = MapFactory.getInstance().create();
- final JournalEntry changes = JOURNAL.newDocument(docStore);
+ final JournalEntry changes = JOURNAL.newDocument(store);
long count = 0;
for (NodeDocument doc : suspects) {
@@ -292,7 +328,7 @@ public class LastRevRecoveryAgent {
// we don't know when the document was last modified with
// the given clusterId. need to read from store
String id = Utils.getIdFromPath(parentPath);
- NodeDocument doc = docStore.find(NODES, id);
+ NodeDocument doc = store.find(NODES, id);
if (doc != null) {
Revision lastRev = doc.getLastRev().get(clusterId);
Revision lastMod = determineLastModification(doc, clusterId);
@@ -328,6 +364,15 @@ public class LastRevRecoveryAgent {
log.info("Dry run of lastRev recovery identified [{}] documents for " +
"cluster node [{}]: {}", size, clusterId, updates);
} else {
+ // check deadline before the update
+ if (revisionContext.getClock().getTime() > deadline) {
+ String msg = String.format("Cluster node %d was unable to " +
+ "perform lastRev recovery for clusterId %d within " +
+ "deadline: %s", clusterId, clusterId,
+ Utils.timestampToString(deadline));
+ throw new DocumentStoreException(msg);
+ }
+
//UnsavedModifications is designed to be used in concurrent
//access mode. For recovery case there is no concurrent access
//involve so just pass a new lock instance
@@ -336,7 +381,7 @@ public class LastRevRecoveryAgent {
// thus it doesn't matter, where exactly the check is done
// as to whether the recovered lastRev has already been
// written to the journal.
- unsaved.persist(docStore, new Supplier<Revision>() {
+ unsaved.persist(store, new Supplier<Revision>() {
@Override
public Revision get() {
return sweepRev.get();
@@ -357,7 +402,7 @@ public class LastRevRecoveryAgent {
}
final String id = JournalEntry.asId(lastRootRev); // lastRootRev never null at this point
- final JournalEntry existingEntry = docStore.find(Collection.JOURNAL, id);
+ final JournalEntry existingEntry = store.find(Collection.JOURNAL, id);
if (existingEntry != null) {
// then the journal entry was already written - as can happen if
// someone else (or the original instance itself) wrote the
@@ -368,7 +413,7 @@ public class LastRevRecoveryAgent {
}
// otherwise store a new journal entry now
- docStore.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev)));
+ store.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev)));
}
}, new ReentrantLock());
@@ -379,9 +424,19 @@ public class LastRevRecoveryAgent {
return size;
}
+ //--------------------------< internal >------------------------------------
+
/**
* Retrieves possible candidates which have been modified after the given
* {@code startTime} and recovers the missing updates.
+ * <p>
+ * If recovery is performed for the clusterId as exposed by the revision
+ * context passed to the constructor of this recovery agent, then this
+ * method will put a deadline on how long recovery may take. The deadline
+ * is the current lease end as read from the {@code clusterNodes} collection
+ * entry for the {@code clusterId} to recover minus the
+ * {@link ClusterNodeInfo#DEFAULT_LEASE_FAILURE_MARGIN_MILLIS}. This method
+ * will throw a {@link DocumentStoreException} if the deadline is reached.
*
* @param nodeInfo the info of the cluster node to recover.
* @param startTime the start time
@@ -391,20 +446,23 @@ public class LastRevRecoveryAgent {
* @return the number of restored nodes or {@code -1} if recovery is still
* ongoing by another process even when {@code waitUntil} time was
* reached.
+ * @throws DocumentStoreException if the deadline is reached or some other
+ * error occurs while reading from the underlying document store.
*/
private int recoverCandidates(final ClusterNodeInfoDocument nodeInfo,
final long startTime,
final long waitUntil,
- final String info) {
+ final String info)
+ throws DocumentStoreException {
ClusterNodeInfoDocument infoDoc = nodeInfo;
int clusterId = infoDoc.getClusterId();
for (;;) {
if (missingLastRevUtil.acquireRecoveryLock(
- clusterId, nodeStore.getClusterId())) {
+ clusterId, revisionContext.getClusterId())) {
break;
}
- Clock clock = nodeStore.getClock();
+ Clock clock = revisionContext.getClock();
long remaining = waitUntil - clock.getTime();
if (remaining < 0) {
// no need to wait for lock release, waitUntil already reached
@@ -426,7 +484,12 @@ public class LastRevRecoveryAgent {
throw new DocumentStoreException(msg, e);
}
infoDoc = missingLastRevUtil.getClusterNodeInfo(clusterId);
- if (!missingLastRevUtil.isRecoveryNeeded(infoDoc)) {
+ if (infoDoc == null) {
+ String msg = String.format("No cluster node info document " +
+ "for id %d", clusterId);
+ throw new DocumentStoreException(msg);
+ }
+ if (!infoDoc.isRecoveryNeeded(clock.getTime())) {
// meanwhile another process finished recovery
return 0;
}
@@ -449,8 +512,7 @@ public class LastRevRecoveryAgent {
}
} finally {
missingLastRevUtil.releaseRecoveryLock(clusterId, success);
-
- nodeStore.signalClusterStateChange();
+ afterRecovery.accept(clusterId);
}
}
@@ -474,7 +536,7 @@ public class LastRevRecoveryAgent {
// collect committed changes of this cluster node
for (Map.Entry<Revision, String> entry : filterKeys(valueMap, cp).entrySet()) {
Revision rev = entry.getKey();
- String cv = nodeStore.getCommitValue(rev, doc);
+ String cv = revisionContext.getCommitValue(rev, doc);
if (isCommitted(cv)) {
lastModified = Utils.max(lastModified, resolveCommitRevision(rev, cv));
break;
@@ -501,7 +563,7 @@ public class LastRevRecoveryAgent {
if (isRecoveryNeeded()) {
Iterable<Integer> clusterIds = getRecoveryCandidateNodes();
log.info("ClusterNodeId [{}] starting Last Revision Recovery for clusterNodeId(s) {}",
- nodeStore.getClusterId(), clusterIds);
+ revisionContext.getClusterId(), clusterIds);
for (int clusterId : clusterIds) {
if (recover(clusterId) == -1) {
log.info("Last Revision Recovery for cluster node {} " +
@@ -524,14 +586,10 @@ public class LastRevRecoveryAgent {
new Predicate<ClusterNodeInfoDocument>() {
@Override
public boolean apply(ClusterNodeInfoDocument input) {
- return nodeStore.getClusterId() != input.getClusterId() && missingLastRevUtil.isRecoveryNeeded(input);
- }
- }), new Function<ClusterNodeInfoDocument, Integer>() {
- @Override
- public Integer apply(ClusterNodeInfoDocument input) {
- return input.getClusterId();
+ return revisionContext.getClusterId() != input.getClusterId()
+ && input.isRecoveryNeeded(revisionContext.getClock().getTime());
}
- });
+ }), ClusterNodeInfoDocument::getClusterId);
}
private static class ClusterPredicate implements Predicate<Revision> {
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java?rev=1834986&r1=1834985&r2=1834986&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java Tue Jul 3 15:25:38 2018
@@ -19,24 +19,17 @@
package org.apache.jackrabbit.oak.plugins.document;
+import java.util.stream.StreamSupport;
+
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
-import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.stats.Clock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.ACTIVE;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.LEASE_END_KEY;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_BY;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_LOCK;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState.ACQUIRED;
-import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.STATE;
import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.getModifiedInSecs;
@@ -47,22 +40,12 @@ import static org.apache.jackrabbit.oak.
*/
public class MissingLastRevSeeker {
- private static final Logger LOG = LoggerFactory.getLogger(MissingLastRevSeeker.class);
-
protected final String ROOT_PATH = "/";
private final DocumentStore store;
protected final Clock clock;
- private final Predicate<ClusterNodeInfoDocument> isRecoveryNeeded =
- new Predicate<ClusterNodeInfoDocument>() {
- @Override
- public boolean apply(ClusterNodeInfoDocument nodeInfo) {
- return isRecoveryNeeded(nodeInfo);
- }
- };
-
public MissingLastRevSeeker(DocumentStore store, Clock clock) {
this.store = store;
this.clock = clock;
@@ -123,21 +106,8 @@ public class MissingLastRevSeeker {
* @return whether the lock has been acquired
*/
public boolean acquireRecoveryLock(int clusterId, int recoveredBy) {
- ClusterNodeInfoDocument doc = getClusterNodeInfo(clusterId);
- if (doc == null) {
- // this is unexpected...
- return false;
- }
- if (!isRecoveryNeeded(doc)) {
- return false;
- }
- boolean acquired = tryAcquireRecoveryLock(doc, recoveredBy);
- if (acquired) {
- return true;
- }
- // either we already own the lock or were able to break the lock
- return doc.isBeingRecoveredBy(recoveredBy)
- || tryBreakRecoveryLock(doc, recoveredBy);
+ return new RecoveryLock(store, clock, clusterId)
+ .acquireRecoveryLock(recoveredBy);
}
/**
@@ -152,117 +122,34 @@ public class MissingLastRevSeeker {
* @param success whether recovery was successful.
*/
public void releaseRecoveryLock(int clusterId, boolean success) {
- try {
- UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
- update.set(REV_RECOVERY_LOCK, RecoverLockState.NONE.name());
- update.set(REV_RECOVERY_BY, null);
- if (success) {
- update.set(STATE, null);
- }
- ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
- if (old == null) {
- throw new RuntimeException("ClusterNodeInfo document for " + clusterId + " missing.");
- }
- LOG.info("Released recovery lock for cluster id {} (recovery successful: {})",
- clusterId, success);
- } catch (RuntimeException ex) {
- LOG.error("Failed to release the recovery lock for clusterNodeId " + clusterId, ex);
- throw (ex);
- }
+ new RecoveryLock(store, clock, clusterId).releaseRecoveryLock(success);
}
public NodeDocument getRoot() {
return store.find(Collection.NODES, Utils.getIdFromPath(ROOT_PATH));
}
- public boolean isRecoveryNeeded() {
- return Iterables.any(getAllClusters(), isRecoveryNeeded);
- }
-
/**
- * Check if _lastRev recovery needed for this cluster node
- * state is Active and currentTime past the leaseEnd time
+ * Returns {@code true} if any of the cluster node info documents satisfies
+ * {@link ClusterNodeInfoDocument#isRecoveryNeeded(long)} where the passed
+ * timestamp is the current time.
+ *
+ * @return {@code true} if any of the cluster nodes need recovery,
+ * {@code false} otherwise.
*/
- public boolean isRecoveryNeeded(@Nonnull ClusterNodeInfoDocument nodeInfo) {
- return nodeInfo.isActive() && clock.getTime() > nodeInfo.getLeaseEndTime();
+ public boolean isRecoveryNeeded() {
+ long now = clock.getTime();
+ return StreamSupport.stream(getAllClusters().spliterator(), false)
+ .anyMatch(info -> info != null && info.isRecoveryNeeded(now));
}
- //-------------------------< internal >-------------------------------------
-
/**
- * Acquire a recovery lock for the given cluster node info document
+ * Same as {@link ClusterNodeInfoDocument#isRecoveryNeeded(long)}.
*
- * @param info
- * info document of the cluster that is going to be recovered
- * @param recoveredBy
- * id of cluster doing the recovery ({@code 0} when unknown)
- * @return whether the lock has been acquired
+ * @deprecated use {@link ClusterNodeInfoDocument#isRecoveryNeeded(long)}
+ * instead.
*/
- private boolean tryAcquireRecoveryLock(ClusterNodeInfoDocument info,
- int recoveredBy) {
- int clusterId = info.getClusterId();
- try {
- UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
- update.equals(STATE, ACTIVE.name());
- update.equals(LEASE_END_KEY, info.getLeaseEndTime());
- update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
- update.set(REV_RECOVERY_LOCK, ACQUIRED.name());
- if (recoveredBy != 0) {
- update.set(REV_RECOVERY_BY, recoveredBy);
- }
- ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
- if (old != null) {
- LOG.info("Acquired recovery lock for cluster id {}", clusterId);
- }
- return old != null;
- } catch (RuntimeException ex) {
- LOG.error("Failed to acquire the recovery lock for clusterNodeId " + clusterId, ex);
- throw (ex);
- }
- }
-
- /**
- * Checks if the recovering cluster node is inactive and then tries to
- * break the recovery lock.
- *
- * @param doc the cluster node info document of the cluster node to acquire
- * the recovery lock for.
- * @param recoveredBy id of cluster doing the recovery.
- * @return whether the lock has been acquired.
- */
- private boolean tryBreakRecoveryLock(ClusterNodeInfoDocument doc,
- int recoveredBy) {
- Long recoveryBy = doc.getRecoveryBy();
- if (recoveryBy == null) {
- // cannot determine current lock owner
- return false;
- }
- ClusterNodeInfoDocument recovering = getClusterNodeInfo(recoveryBy.intValue());
- if (recovering == null) {
- // cannot determine current lock owner
- return false;
- }
- if (recovering.isActive() && recovering.getLeaseEndTime() > clock.getTime()) {
- // still active, cannot break lock
- return false;
- }
- // try to break the lock
- try {
- UpdateOp update = new UpdateOp(Integer.toString(doc.getClusterId()), false);
- update.equals(STATE, ACTIVE.name());
- update.equals(REV_RECOVERY_LOCK, ACQUIRED.name());
- update.equals(REV_RECOVERY_BY, recoveryBy);
- update.set(REV_RECOVERY_BY, recoveredBy);
- ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
- if (old != null) {
- LOG.info("Acquired (broke) recovery lock for cluster id {}. " +
- "Previous lock owner: {}", doc.getClusterId(), recoveryBy);
- }
- return old != null;
- } catch (RuntimeException ex) {
- LOG.error("Failed to break the recovery lock for clusterNodeId " +
- doc.getClusterId(), ex);
- throw (ex);
- }
+ public boolean isRecoveryNeeded(@Nonnull ClusterNodeInfoDocument nodeInfo) {
+ return nodeInfo.isRecoveryNeeded(clock.getTime());
}
}
Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java?rev=1834986&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java Tue Jul 3 15:25:38 2018
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+/**
+ * This handler gets called back when recovery is needed for a clusterId. An
+ * implementation then tries to perform the recovery and returns whether the
+ * recovery was successful. Upon successful recovery, the clusterId will have
+ * transitioned to the inactive state.
+ */
+interface RecoveryHandler {
+
+ /**
+ * A no-op recovery handler, always returning false.
+ */
+ RecoveryHandler NOOP = clusterId -> false;
+
+ /**
+ * Perform recovery for the given clusterId and return whether the recovery
+ * was successful.
+ *
+ * @param clusterId perform recovery for this clusterId.
+ * @return {@code true} if recovery was successful, {@code false} otherwise.
+ */
+ boolean recover(int clusterId);
+}
Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java?rev=1834986&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java Tue Jul 3 15:25:38 2018
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
+
+/**
+ * Implements the recovery handler on startup of a {@link DocumentNodeStore}.
+ */
+class RecoveryHandlerImpl implements RecoveryHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RecoveryHandlerImpl.class);
+
+ private static final int COMMIT_VALUE_CACHE_SIZE = 10000;
+
+ /**
+ * The timeout in milliseconds to wait for the recovery performed by
+ * another cluster node.
+ */
+ private long recoveryWaitTimeoutMS =
+ Long.getLong("oak.recoveryWaitTimeoutMS", 60000);
+
+ private final DocumentStore store;
+ private final Clock clock;
+ private final MissingLastRevSeeker lastRevSeeker;
+
+ RecoveryHandlerImpl(DocumentStore store,
+ Clock clock,
+ MissingLastRevSeeker lastRevSeeker) {
+ this.store = store;
+ this.clock = clock;
+ this.lastRevSeeker = lastRevSeeker;
+ }
+
+ @Override
+ public boolean recover(int clusterId) {
+ try {
+ return recoverInternal(clusterId);
+ } catch (DocumentStoreException e) {
+ LOG.warn("Recovery failed for cluster node {}", clusterId, e);
+ return false;
+ }
+ }
+
+ private boolean recoverInternal(int clusterId)
+ throws DocumentStoreException {
+ NodeDocument root = Utils.getRootDocument(store);
+ // prepare a context for recovery
+ RevisionContext context = new RecoveryContext(
+ root, clock, clusterId,
+ new CachingCommitValueResolver(COMMIT_VALUE_CACHE_SIZE, root::getSweepRevisions));
+ LastRevRecoveryAgent agent = new LastRevRecoveryAgent(
+ store, context, lastRevSeeker, id -> {});
+ long timeout = context.getClock().getTime() + recoveryWaitTimeoutMS;
+ int numRecovered = agent.recover(clusterId, timeout);
+ if (numRecovered == -1) {
+ ClusterNodeInfoDocument doc = store.find(CLUSTER_NODES, String.valueOf(clusterId));
+ String otherId = "n/a";
+ if (doc != null) {
+ otherId = String.valueOf(doc.get(ClusterNodeInfo.REV_RECOVERY_BY));
+ }
+ String msg = "This cluster node (" + clusterId + ") requires " +
+ "_lastRev recovery which is currently performed by " +
+ "another cluster node (" + otherId + "). Recovery is " +
+ "still ongoing after " + recoveryWaitTimeoutMS + " ms.";
+ LOG.info(msg);
+ return false;
+ }
+
+ return true;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryHandlerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLock.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLock.java?rev=1834986&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLock.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/RecoveryLock.java Tue Jul 3 15:25:38 2018
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.ACTIVE;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.LEASE_END_KEY;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_BY;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_LOCK;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState.ACQUIRED;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.STATE;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
+
+/**
+ * Utility class to acquire and release a recovery lock on an entry in the
+ * {@link Collection#CLUSTER_NODES} collection.
+ */
+class RecoveryLock {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RecoveryLock.class);
+
+ private final DocumentStore store;
+
+ private final Clock clock;
+
+ private final int clusterId;
+
+ /**
+ * Prepare a recovery lock on the document store for an entry with the given
+ * {@code clusterId}. Constructing the lock does not check whether an entry
+ * actually exists for the {@code clusterId} and is indeed valid.
+ *
+ * @param store the store where the cluster node entries are stored.
+ * @param clock the clock used to check whether an entry's lease expired.
+ * @param clusterId the {@code clusterId} this lock is created for.
+ */
+ RecoveryLock(DocumentStore store, Clock clock, int clusterId) {
+ this.store = store;
+ this.clock = clock;
+ this.clusterId = clusterId;
+ }
+
+ /**
+ * Acquire a recovery lock for the cluster node info entry with the
+ * {@code clusterId} specified in the constructor of this recovery lock.
+ * This method may break a lock when it determines the cluster node holding
+ * the recovery lock is no more active or its lease expired.
+ *
+ * @param recoveredBy id of cluster doing the recovery
+ * @return whether the lock has been acquired
+ */
+ boolean acquireRecoveryLock(int recoveredBy) {
+ ClusterNodeInfoDocument doc = store.find(CLUSTER_NODES, String.valueOf(clusterId));
+ if (doc == null) {
+ // this is unexpected...
+ return false;
+ }
+ if (!doc.isRecoveryNeeded(clock.getTime())) {
+ return false;
+ }
+ if (tryAcquireRecoveryLock(doc, recoveredBy)) {
+ return true;
+ }
+ // either we already own the lock or were able to break the lock
+ return doc.isBeingRecoveredBy(recoveredBy)
+ || tryBreakRecoveryLock(doc, recoveredBy);
+ }
+
+ /**
+ * Releases the recovery lock on the given {@code clusterId}. If
+ * {@code success} is {@code true}, the state of the cluster node entry
+ * is reset, otherwise it is left as is. That is, for a cluster node which
+ * requires recovery and the recovery process failed, the state will still
+ * be active, when this release method is called with {@code success} set
+ * to {@code false}.
+ *
+ * @param success whether recovery was successful.
+ */
+ void releaseRecoveryLock(boolean success) {
+ try {
+ UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
+ update.set(REV_RECOVERY_LOCK, ClusterNodeInfo.RecoverLockState.NONE.name());
+ update.set(REV_RECOVERY_BY, null);
+ if (success) {
+ update.set(STATE, null);
+ update.set(LEASE_END_KEY, null);
+ } else {
+ // make sure lease is expired
+ update.set(LEASE_END_KEY, clock.getTime() - 1);
+ }
+ ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
+ if (old == null) {
+ throw new RuntimeException("ClusterNodeInfo document for " + clusterId + " missing.");
+ }
+ LOG.info("Released recovery lock for cluster id {} (recovery successful: {})",
+ clusterId, success);
+ } catch (RuntimeException ex) {
+ LOG.error("Failed to release the recovery lock for clusterNodeId " + clusterId, ex);
+ throw (ex);
+ }
+ }
+
+ //-------------------------------< internal >-------------------------------
+
+ /**
+ * Acquire a recovery lock for the given cluster node info document
+ *
+ * @param info
+ * info document of the cluster that is going to be recovered
+ * @param recoveredBy
+ * id of cluster doing the recovery ({@code 0} when unknown)
+ * @return whether the lock has been acquired
+ */
+ private boolean tryAcquireRecoveryLock(ClusterNodeInfoDocument info,
+ int recoveredBy) {
+ int clusterId = info.getClusterId();
+ try {
+ UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
+ update.equals(STATE, ACTIVE.name());
+ update.equals(LEASE_END_KEY, info.getLeaseEndTime());
+ update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
+ update.set(REV_RECOVERY_LOCK, ACQUIRED.name());
+ // Renew the lease once to give the recovery some time to finish
+ // in case recovery is done by the same clusterId. In this scenario
+ // the lease is not updated by a background thread.
+ update.set(LEASE_END_KEY, clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS);
+ if (recoveredBy != 0) {
+ update.set(REV_RECOVERY_BY, recoveredBy);
+ }
+ ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
+ if (old != null) {
+ LOG.info("Acquired recovery lock for cluster id {}", clusterId);
+ }
+ return old != null;
+ } catch (RuntimeException ex) {
+ LOG.error("Failed to acquire the recovery lock for clusterNodeId " + clusterId, ex);
+ throw (ex);
+ }
+ }
+
+ /**
+ * Checks if the recovering cluster node is inactive and then tries to
+ * break the recovery lock.
+ *
+ * @param doc the cluster node info document of the cluster node to acquire
+ * the recovery lock for.
+ * @param recoveredBy id of cluster doing the recovery.
+ * @return whether the lock has been acquired.
+ */
+ private boolean tryBreakRecoveryLock(ClusterNodeInfoDocument doc,
+ int recoveredBy) {
+ Long recoveryBy = doc.getRecoveryBy();
+ if (recoveryBy == null) {
+ // cannot determine current lock owner
+ return false;
+ }
+ ClusterNodeInfoDocument recovering = store.find(CLUSTER_NODES, String.valueOf(recoveryBy));
+ if (recovering == null) {
+ // cannot determine current lock owner
+ return false;
+ }
+ long now = clock.getTime();
+ long leaseEnd = recovering.getLeaseEndTime();
+ if (recovering.isActive() && leaseEnd > now) {
+ // still active, cannot break lock
+ return false;
+ }
+ // try to break the lock
+ try {
+ UpdateOp update = new UpdateOp(Integer.toString(doc.getClusterId()), false);
+ update.equals(STATE, ACTIVE.name());
+ update.equals(REV_RECOVERY_LOCK, ACQUIRED.name());
+ update.equals(REV_RECOVERY_BY, recoveryBy);
+ update.set(REV_RECOVERY_BY, recoveredBy);
+ // Renew the lease once to give the recovery some time to finish
+ // in case recovery is done by the same clusterId. In this scenario
+ // the lease is not updated by a background thread.
+ update.set(LEASE_END_KEY, clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS);
+ ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, update);
+ if (old != null) {
+ LOG.info("Acquired (broke) recovery lock for cluster id {}. " +
+ "Previous lock owner: {}", doc.getClusterId(), recoveryBy);
+ }
+ return old != null;
+ } catch (RuntimeException ex) {
+ LOG.error("Failed to break the recovery lock for clusterNodeId " +
+ doc.getClusterId(), ex);
+ throw (ex);
+ }
+ }
+}