You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2020/04/08 07:41:19 UTC
svn commit: r1876276 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java
test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java
Author: thomasm
Date: Wed Apr 8 07:41:19 2020
New Revision: 1876276
URL: http://svn.apache.org/viewvc?rev=1876276&view=rev
Log:
OAK-8997 Index importer: ClusterNodeStoreLock needs a retry logic
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java?rev=1876276&r1=1876275&r2=1876276&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java Wed Apr 8 07:41:19 2020
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.plugins.index.importer;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -41,6 +42,8 @@ public class ClusterNodeStoreLock implem
* in between the import process which can take some time
*/
private static final long LOCK_TIMEOUT = TimeUnit.DAYS.toMillis(100);
+ // retry for at most 2 minutes
+ private static final long MAX_RETRY_TIME = 2 * 60 * 1000;
private final Logger log = LoggerFactory.getLogger(getClass());
private final NodeStore nodeStore;
private final Clock clock;
@@ -56,6 +59,10 @@ public class ClusterNodeStoreLock implem
@Override
public ClusteredLockToken lock(String asyncIndexerLane) throws CommitFailedException {
+ return retryIfNeeded(() -> tryLock(asyncIndexerLane));
+ }
+
+ private ClusteredLockToken tryLock(String asyncIndexerLane) throws CommitFailedException {
NodeBuilder builder = nodeStore.getRoot().builder();
NodeBuilder async = builder.child(":async");
@@ -67,7 +74,6 @@ public class ClusterNodeStoreLock implem
"commit to fail. Such a failure should be ignored");
}
- //TODO Attempt few times if merge failure due to current running indexer cycle
async.setProperty(leaseName, leaseEndTime);
async.setProperty(lockName(asyncIndexerLane), true);
NodeStoreUtils.mergeWithConcurrentCheck(nodeStore, builder);
@@ -76,9 +82,42 @@ public class ClusterNodeStoreLock implem
return new ClusteredLockToken(asyncIndexerLane, leaseEndTime);
}
+
+ private <T> T retryIfNeeded(Callable<T> r) throws CommitFailedException {
+ // Attempt few times if merge failure due to current running indexer cycle
+ int backOffMaxMillis = 1;
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ return r.call();
+ } catch (Exception e) {
+ log.info("Commit failed, retrying: " + e);
+ long time = System.currentTimeMillis() - start;
+ if (time > MAX_RETRY_TIME) {
+ if (e instanceof CommitFailedException) {
+ throw (CommitFailedException) e;
+ }
+ log.error("Unexpected failure retrying", e);
+ throw new CommitFailedException(CommitFailedException.UNSUPPORTED, 2, e.getMessage(), e);
+ }
+ int sleep = (int) (backOffMaxMillis * Math.random());
+ backOffMaxMillis *= 2;
+ log.info("Wait " + sleep + " ms");
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e1) {
+ // ignore
+ }
+ }
+ }
+ }
@Override
public void unlock(ClusteredLockToken token) throws CommitFailedException {
+ retryIfNeeded(() -> tryUnlock(token));
+ }
+
+ private Void tryUnlock(ClusteredLockToken token) throws CommitFailedException {
String leaseName = AsyncIndexUpdate.leasify(token.laneName);
NodeBuilder builder = nodeStore.getRoot().builder();
@@ -87,6 +126,7 @@ public class ClusterNodeStoreLock implem
async.removeProperty(lockName(token.laneName));
NodeStoreUtils.mergeWithConcurrentCheck(nodeStore, builder);
log.info("Remove the lock for async indexer lane [{}]", token.laneName);
+ return null;
}
public boolean isLocked(String asyncIndexerLane) {
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java?rev=1876276&r1=1876275&r2=1876276&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java Wed Apr 8 07:41:19 2020
@@ -19,10 +19,18 @@
package org.apache.jackrabbit.oak.plugins.index.importer;
-import com.google.common.collect.ImmutableSet;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
-import org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock.LockToken;
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -33,10 +41,7 @@ import org.apache.jackrabbit.oak.spi.sta
import org.junit.Before;
import org.junit.Test;
-import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
-import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
-import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
-import static org.junit.Assert.*;
+import com.google.common.collect.ImmutableSet;
public class ClusterNodeStoreLockTest {
private NodeStore store = new MemoryNodeStore();
@@ -53,6 +58,36 @@ public class ClusterNodeStoreLockTest {
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
+
+ @Test
+ public void lockConcurrently() throws Exception {
+ final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>());
+ ArrayList<Thread> threads = new ArrayList<>();
+ for (int j = 0; j < 100; j++) {
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < 100; i++) {
+ try {
+ ClusterNodeStoreLock lock = new ClusterNodeStoreLock(store);
+ ClusteredLockToken token = lock.lock("async");
+ lock.unlock(token);
+ } catch (Throwable e) {
+ exceptions.add(e);
+ }
+ }
+ }
+ });
+ t.start();
+ threads.add(t);
+ }
+ for(Thread t : threads) {
+ t.join();
+ }
+ if (!exceptions.isEmpty()) {
+ throw new RuntimeException(exceptions.get(0));
+ }
+ }
@Test
public void locking() throws Exception{