You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2016/05/09 07:04:50 UTC
svn commit: r1742888 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/api/jmx/
main/java/org/apache/jackrabbit/oak/plugins/index/
test/java/org/apache/jackrabbit/oak/plugins/index/
Author: chetanm
Date: Mon May 9 07:04:49 2016
New Revision: 1742888
URL: http://svn.apache.org/viewvc?rev=1742888&view=rev
Log:
OAK-4262 - Provide a way to abort an async indexing run
Exposed an abort operation on IndexStatsMBean
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1742888&r1=1742887&r2=1742888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java Mon May 9 07:04:49 2016
@@ -19,9 +19,11 @@ package org.apache.jackrabbit.oak.api.jm
import javax.management.openmbean.CompositeData;
+import aQute.bnd.annotation.ProviderType;
import org.apache.jackrabbit.oak.commons.jmx.Description;
import org.apache.jackrabbit.oak.commons.jmx.Name;
+@ProviderType
public interface IndexStatsMBean {
String TYPE = "IndexStats";
@@ -32,6 +34,8 @@ public interface IndexStatsMBean {
String STATUS_DONE = "done";
+ String STATUS_INTERRUPTED = "interrupted";
+
/**
* @return The time the indexing job stared at, or {@code ""} if it is
* not currently running.
@@ -70,6 +74,10 @@ public interface IndexStatsMBean {
*/
void pause();
+ @Description("Aborts any running indexing cycle. It only effects current " +
+ "running cycle")
+ String abort();
+
/**
* Resumes the indexing process. All changes from the previous indexed state
* will be indexed.
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java?rev=1742888&r1=1742887&r2=1742888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java Mon May 9 07:04:49 2016
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-@Version("3.2.0")
+@Version("3.3.0")
@Export(optional = "provide:=true")
package org.apache.jackrabbit.oak.api.jmx;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1742888&r1=1742887&r2=1742888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Mon May 9 07:04:49 2016
@@ -296,6 +296,7 @@ public class AsyncIndexUpdate implements
@Override
public void indexUpdate() throws CommitFailedException {
if (forcedStop.get()){
+ forcedStop.set(false);
throw INTERRUPTED;
}
@@ -701,6 +702,7 @@ public class AsyncIndexUpdate implements
public void failed(Exception e) {
if (e == INTERRUPTED){
+ status = STATUS_INTERRUPTED;
log.info("[{}] The index update interrupted", name);
log.debug("[{}] The index update interrupted", name, e);
return;
@@ -770,6 +772,16 @@ public class AsyncIndexUpdate implements
}
@Override
+ public String abort() {
+ //Abort if any indexing run is in progress
+ if (runPermit.availablePermits() == 0){
+ forcedStopFlag.set(true);
+ return "Abort request placed";
+ }
+ return "No current running indexing found. Nothing to abort!";
+ }
+
+ @Override
public void resume() {
log.debug("[{}] Resuming the async indexer", name);
this.isPaused = false;
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1742888&r1=1742887&r2=1742888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Mon May 9 07:04:49 2016
@@ -37,6 +37,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,6 +50,7 @@ import javax.management.openmbean.Compos
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTaskSpliter;
@@ -1193,6 +1195,82 @@ public class AsyncIndexUpdateTest {
lc.finished();
}
+ @Test
+ public void abortedRun() throws Exception{
+ NodeStore store = new MemoryNodeStore();
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("testRoot").setProperty("foo", "abc");
+
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ final Semaphore asyncLock = new Semaphore(1);
+ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
+ @Override
+ protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
+ String beforeCheckpoint, String afterCheckpoint,
+ AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
+ return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+ indexStats, stopFlag){
+
+ @Override
+ public void indexUpdate() throws CommitFailedException {
+ try {
+ asyncLock.acquire();
+ } catch (InterruptedException ignore) {
+ }
+ try {
+ super.indexUpdate();
+ }finally {
+ asyncLock.release();
+ }
+ }
+ };
+ }
+ };
+
+ runOneCycle(async);
+ assertEquals(IndexStatsMBean.STATUS_DONE, async.getIndexStats().getStatus());
+
+ //Below we ensure that we interrupt while the indexing is in progress
+ //hence the use of asyncLock which ensures the abort is called at right time
+
+ //Now make some changes to
+ builder = store.getRoot().builder();
+ builder.child("testRoot2").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ Thread t = new Thread(async);
+ //Lock to ensure that AsyncIndexUpdate waits
+ asyncLock.acquire();
+
+ t.start();
+
+ //Wait till async gets to wait state i.e. inside run
+ while(!asyncLock.hasQueuedThreads());
+
+ assertEquals(IndexStatsMBean.STATUS_RUNNING, async.getIndexStats().getStatus());
+ assertEquals("Abort request placed",async.getIndexStats().abort());
+
+ asyncLock.release();
+
+ retry(5, 5, new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return IndexStatsMBean.STATUS_INTERRUPTED.equals(async.getIndexStats().getStatus());
+ }
+ });
+
+ //Post abort indexing should be fine
+ runOneCycle(async);
+ assertEquals(IndexStatsMBean.STATUS_DONE, async.getIndexStats().getStatus());
+ assertFalse(async.isClosed());
+ }
+
+
private void assertLogPhrase(List<String> logs, String logPhrase){
assertThat(logs.toString(), containsString(logPhrase));
@@ -1207,4 +1285,23 @@ public class AsyncIndexUpdateTest {
return lc;
}
+ private static void retry(int timeoutSeconds, int intervalBetweenTriesMsec, Callable<Boolean> c) {
+ long timeout = System.currentTimeMillis() + timeoutSeconds * 1000L;
+ while (System.currentTimeMillis() < timeout) {
+ try {
+ if (c.call()) {
+ return;
+ }
+ } catch (Exception ignore) {
+ }
+
+ try {
+ Thread.sleep(intervalBetweenTriesMsec);
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ fail("RetryLoop failed, condition is false after " + timeoutSeconds + " seconds: ");
+ }
+
}