You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2016/01/29 11:06:36 UTC
svn commit: r1727508 - in /jackrabbit/oak/trunk:
oak-core/src/main/java/org/apache/jackrabbit/oak/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/
oak-core/src/test/java/org/apache/jackrabbit/oak/
oak-core/src/test/java/org/apache/jackr...
Author: chetanm
Date: Fri Jan 29 10:06:35 2016
New Revision: 1727508
URL: http://svn.apache.org/viewvc?rev=1727508&view=rev
Log:
OAK-3923 - Async indexing delayed by 30 minutes because stop order is incorrect
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/OakTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1727508&r1=1727507&r2=1727508&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Fri Jan 29 10:06:35 2016
@@ -642,6 +642,7 @@ public class Oak {
AsyncIndexUpdate task = new AsyncIndexUpdate(t.getKey(), store,
indexEditors);
indexRegistration.registerAsyncIndexer(task, t.getValue());
+ closer.register(task);
}
PropertyIndexAsyncReindex asyncPI = new PropertyIndexAsyncReindex(
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1727508&r1=1727507&r2=1727508&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Fri Jan 29 10:06:35 2016
@@ -27,10 +27,13 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
+import java.io.Closeable;
import java.util.Calendar;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -74,7 +77,7 @@ import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
-public class AsyncIndexUpdate implements Runnable {
+public class AsyncIndexUpdate implements Runnable, Closeable {
private static final Logger log = LoggerFactory
.getLogger(AsyncIndexUpdate.class);
@@ -90,6 +93,9 @@ public class AsyncIndexUpdate implements
private static final CommitFailedException CONCURRENT_UPDATE = new CommitFailedException(
"Async", 1, "Concurrent update detected");
+ private static final CommitFailedException INTERRUPTED = new CommitFailedException(
+ "Async", 1, "Indexing stopped forcefully");
+
/**
* Timeout in milliseconds after which an async job would be considered as
* timed out. Another node in cluster would wait for timeout before
@@ -139,6 +145,16 @@ public class AsyncIndexUpdate implements
private final IndexTaskSpliter taskSplitter = new IndexTaskSpliter();
+ private final Semaphore runPermit = new Semaphore(1);
+
+ /**
+ * Flag which would be set to true if the close operation is not
+ * able to close within specific time. The flag would be an
+ * indication to indexing thread to return straightway say by
+ * throwing an exception
+ */
+ private final AtomicBoolean forcedStopFlag = new AtomicBoolean();
+
private IndexMBeanRegistration mbeanRegistration;
private long leaseTimeOut;
@@ -151,6 +167,14 @@ public class AsyncIndexUpdate implements
private static long ERROR_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer
.getInteger("oak.async.warn.interval", 30));
+ /**
+ * Timeout in seconds for which close call would wait before forcefully
+ * stopping the indexing thread
+ */
+ private int softTimeOutSecs = Integer.getInteger("oak.async.softTimeOutSecs", 2 * 60);
+
+ private boolean closed;
+
public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
@Nonnull IndexEditorProvider provider, boolean switchOnSync) {
this.name = checkNotNull(name);
@@ -194,14 +218,17 @@ public class AsyncIndexUpdate implements
private final AsyncIndexStats indexStats;
+ private final AtomicBoolean forcedStop;
+
/** Expiration time of the last lease we committed */
private long lease;
public AsyncUpdateCallback(NodeStore store, String name,
long leaseTimeOut, String checkpoint, String afterCheckpoint,
- AsyncIndexStats indexStats) {
+ AsyncIndexStats indexStats, AtomicBoolean forcedStop) {
this.store = store;
this.name = name;
+ this.forcedStop = forcedStop;
this.leaseTimeOut = leaseTimeOut;
this.checkpoint = checkpoint;
this.afterCheckpoint = afterCheckpoint;
@@ -268,6 +295,10 @@ public class AsyncIndexUpdate implements
@Override
public void indexUpdate() throws CommitFailedException {
+ if (forcedStop.get()){
+ throw INTERRUPTED;
+ }
+
if (indexStats.incUpdates() % 100 == 0) {
long now = System.currentTimeMillis();
if (now + leaseTimeOut > lease) {
@@ -283,6 +314,52 @@ public class AsyncIndexUpdate implements
@Override
public synchronized void run() {
+ boolean permitAcquired = false;
+ try{
+ if (runPermit.tryAcquire()){
+ permitAcquired = true;
+ runWhenPermitted();
+ } else {
+ log.warn("[{}] Could not acquire run permit. Stop flag set to [{}] Skipping the run", name, forcedStopFlag);
+ }
+ } finally {
+ if (permitAcquired){
+ runPermit.release();
+ }
+ }
+ }
+
+
+ @Override
+ public void close() {
+ int hardTimeOut = 5 * softTimeOutSecs;
+ if(!runPermit.tryAcquire()){
+ //First let current run complete without bothering it
+ log.debug("[{}] [WAITING] Indexing in progress. Would wait for {} secs for it to finish", name, softTimeOutSecs);
+ try {
+ if(!runPermit.tryAcquire(softTimeOutSecs, TimeUnit.SECONDS)){
+ //We have now waited enough. So signal the indexer that it should return right away
+ //as soon as it sees the forcedStopFlag
+ log.debug("[{}] [SOFT LIMIT HIT] Indexing found to be in progress for more than [{}]s. Would " +
+ "signal it to now force stop", name, softTimeOutSecs);
+ forcedStopFlag.set(true);
+ if(!runPermit.tryAcquire(hardTimeOut, TimeUnit.SECONDS)){
+ //Index thread did not listened to our advice. So give up now and warn about it
+ log.warn("[{}] Indexing still not found to be complete. Giving up after [{}]s", name, hardTimeOut);
+ }
+ } else {
+ log.info("[{}] [CLOSED OK] Async indexing run completed. Closing it now", name);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ log.info("[{}] Closed", name);
+ }
+ closed = true;
+ }
+
+ private void runWhenPermitted() {
if (indexStats.isPaused()) {
return;
}
@@ -396,10 +473,11 @@ public class AsyncIndexUpdate implements
}
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
- String name, long leaseTimeOut, String beforeCheckpoint,
- String afterCheckpoint, AsyncIndexStats indexStats) {
+ String name, long leaseTimeOut, String beforeCheckpoint,
+ String afterCheckpoint, AsyncIndexStats indexStats,
+ AtomicBoolean stopFlag) {
return new AsyncUpdateCallback(store, name, leaseTimeOut,
- beforeCheckpoint, afterCheckpoint, indexStats);
+ beforeCheckpoint, afterCheckpoint, indexStats, stopFlag);
}
private boolean updateIndex(NodeState before, String beforeCheckpoint,
@@ -411,7 +489,7 @@ public class AsyncIndexUpdate implements
// create an update callback for tracking index updates
// and maintaining the update lease
AsyncUpdateCallback callback = newAsyncUpdateCallback(store, name,
- leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats);
+ leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats, forcedStopFlag);
callback.prepare();
// check for index tasks split requests, if a split happened, make
@@ -535,6 +613,19 @@ public class AsyncIndexUpdate implements
return this;
}
+ protected AsyncIndexUpdate setCloseTimeOut(int timeOutInSec) {
+ this.softTimeOutSecs = timeOutInSec;
+ return this;
+ }
+
+ public boolean isClosed(){
+ return closed || forcedStopFlag.get();
+ }
+
+ boolean isClosing(){
+ return runPermit.hasQueuedThreads();
+ }
+
private static void preAsyncRunStatsStats(AsyncIndexStats stats) {
stats.start(now());
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/OakTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/OakTest.java?rev=1727508&r1=1727507&r2=1727508&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/OakTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/OakTest.java Fri Jan 29 10:06:35 2016
@@ -17,19 +17,28 @@
package org.apache.jackrabbit.oak;
import java.io.Closeable;
+import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jcr.NoSuchWorkspaceException;
import org.apache.jackrabbit.oak.api.ContentRepository;
import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
+import org.apache.jackrabbit.oak.spi.whiteboard.DefaultWhiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -118,4 +127,28 @@ public class OakTest {
externalExecutor.shutdown();
}
+ @Test
+ public void closeAsyncIndexers() throws Exception{
+ final AtomicReference<AsyncIndexUpdate> async = new AtomicReference<AsyncIndexUpdate>();
+ Whiteboard wb = new DefaultWhiteboard(){
+ @Override
+ public <T> Registration register(Class<T> type, T service, Map<?, ?> properties) {
+ if (service instanceof AsyncIndexUpdate){
+ async.set((AsyncIndexUpdate) service);
+ }
+ return super.register(type, service, properties);
+ }
+ };
+ Oak oak = new Oak()
+ .with(new OpenSecurityProvider())
+ .with(wb)
+ .withAsyncIndexing("foo", 5);
+ ContentRepository repo = oak.createContentRepository();
+
+ ((Closeable)repo).close();
+ assertNotNull(async.get());
+ assertTrue(async.get().isClosed());
+ assertNull(WhiteboardUtils.getService(wb, AsyncIndexUpdate.class));
+ }
+
}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1727508&r1=1727507&r2=1727508&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Fri Jan 29 10:06:35 2016
@@ -23,15 +23,19 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
import static org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider.TYPE;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
@@ -1027,4 +1031,178 @@ public class AsyncIndexUpdateTest {
customLogs.finished();
}
+ @Test
+ public void noRunWhenClosed() throws Exception{
+ NodeStore store = new MemoryNodeStore();
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+ AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+ async.run();
+
+ async.close();
+ LogCustomizer lc = createLogCustomizer(Level.WARN);
+ async.run();
+ assertEquals(1, lc.getLogs().size());
+ assertThat(lc.getLogs().get(0), containsString("Could not acquire run permit"));
+
+ lc.finished();
+ }
+
+ @Test
+ public void closeWithSoftLimit() throws Exception{
+ NodeStore store = new MemoryNodeStore();
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("testRoot").setProperty("foo", "abc");
+
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ final Semaphore asyncLock = new Semaphore(1);
+ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
+ @Override
+ protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
+ String beforeCheckpoint, String afterCheckpoint,
+ AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
+ try {
+ asyncLock.acquire();
+ } catch (InterruptedException ignore) {
+ }
+ return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+ indexStats, stopFlag);
+ }
+ };
+
+ async.setCloseTimeOut(1000);
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ async.run();
+ }
+ });
+
+ Thread closer = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ async.close();
+ }
+ });
+
+ asyncLock.acquire();
+ t.start();
+
+ //Wait till async gets to wait state i.e. inside run
+ while(!asyncLock.hasQueuedThreads());
+
+ LogCustomizer lc = createLogCustomizer(Level.DEBUG);
+ closer.start();
+
+ //Wait till closer is in waiting state
+ while(!async.isClosing());
+
+ //For softLimit case the flag should not be set
+ assertFalse(async.isClosed());
+ assertLogPhrase(lc.getLogs(), "[WAITING]");
+
+ //Let indexing run complete now
+ asyncLock.release();
+
+ //Wait for both threads
+ t.join();
+ closer.join();
+
+ //Close call should complete
+ assertLogPhrase(lc.getLogs(), "[CLOSED OK]");
+ }
+
+ @Test
+ public void closeWithHardLimit() throws Exception{
+ NodeStore store = new MemoryNodeStore();
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("testRoot").setProperty("foo", "abc");
+
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ final Semaphore asyncLock = new Semaphore(1);
+ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
+ @Override
+ protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
+ String beforeCheckpoint, String afterCheckpoint,
+ AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
+ try {
+ asyncLock.acquire();
+ } catch (InterruptedException ignore) {
+ }
+ return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+ indexStats, stopFlag);
+ }
+ };
+
+ //Set a 1 sec close timeout
+ async.setCloseTimeOut(1);
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ async.run();
+ }
+ });
+
+ Thread closer = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ async.close();
+ }
+ });
+
+ //Lock to ensure that AsyncIndexUpdate waits
+ asyncLock.acquire();
+
+ t.start();
+
+ //Wait till async gets to wait state i.e. inside run
+ while(!asyncLock.hasQueuedThreads());
+
+ LogCustomizer lc = createLogCustomizer(Level.DEBUG);
+ closer.start();
+
+ //Wait till stopFlag is set
+ while(!async.isClosed());
+
+ assertLogPhrase(lc.getLogs(), "[SOFT LIMIT HIT]");
+
+ //Let indexing run complete now
+ asyncLock.release();
+
+ //Wait for both threads
+ t.join();
+
+ //Async run would have failed with exception
+ assertNotNull(async.getIndexStats().getLatestError());
+
+ //Wait for close call to complete
+ closer.join();
+ }
+
+
+ private void assertLogPhrase(List<String> logs, String logPhrase){
+ assertThat(logs.toString(), containsString(logPhrase));
+ }
+
+ private static LogCustomizer createLogCustomizer(Level level){
+ LogCustomizer lc = LogCustomizer.forLogger(AsyncIndexUpdate.class.getName())
+ .filter(level)
+ .enable(level)
+ .create();
+ lc.starting();
+ return lc;
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java?rev=1727508&r1=1727507&r2=1727508&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (original)
+++ jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java Fri Jan 29 10:06:35 2016
@@ -399,10 +399,11 @@ public class AsyncIndexUpdateLeaseTest e
@Override
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
- String name, long leaseTimeOut, String checkpoint,
- String afterCheckpoint, AsyncIndexStats indexStats) {
+ String name, long leaseTimeOut, String checkpoint,
+ String afterCheckpoint, AsyncIndexStats indexStats,
+ AtomicBoolean stopFlag) {
return new SpecialAsyncUpdateCallback(store, name, leaseTimeOut,
- checkpoint, afterCheckpoint, indexStats, listener);
+ checkpoint, afterCheckpoint, indexStats, stopFlag, listener);
}
}
@@ -411,10 +412,9 @@ public class AsyncIndexUpdateLeaseTest e
private IndexStatusListener listener;
public SpecialAsyncUpdateCallback(NodeStore store, String name,
- long leaseTimeOut, String checkpoint, String afterCheckpoint,
- AsyncIndexStats indexStats, IndexStatusListener listener) {
- super(store, name, leaseTimeOut, checkpoint, afterCheckpoint,
- indexStats);
+ long leaseTimeOut, String checkpoint, String afterCheckpoint,
+ AsyncIndexStats indexStats, AtomicBoolean stopFlag, IndexStatusListener listener) {
+ super(store, name, leaseTimeOut, checkpoint, afterCheckpoint, indexStats, stopFlag);
this.listener = listener;
}