You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2016/05/09 11:56:33 UTC

svn commit: r1742930 - in /jackrabbit/oak/branches/1.4: ./ oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/

Author: chetanm
Date: Mon May  9 11:56:33 2016
New Revision: 1742930

URL: http://svn.apache.org/viewvc?rev=1742930&view=rev
Log:
OAK-4262 - Provide a way to abort an async indexing run

Merging 1742888,1742916

Modified:
    jackrabbit/oak/branches/1.4/   (props changed)
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Propchange: jackrabbit/oak/branches/1.4/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May  9 11:56:33 2016
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1733615,1733875,1733913,1734230,1734254,1735052,1735405,1735484,1735549,1735564,1735622,1735638,1735919,1735983,1736176,1737309-1737310,1737998,1738004,1738775,1738963,1740971,1741032,1741339,1741343,1742520
+/jackrabbit/oak/trunk:1733615,1733875,1733913,1734230,1734254,1735052,1735405,1735484,1735549,1735564,1735622,1735638,1735919,1735983,1736176,1737309-1737310,1737998,1738004,1738775,1738963,1740971,1741032,1741339,1741343,1742520,1742888,1742916
 /jackrabbit/trunk:1345480

Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1742930&r1=1742929&r2=1742930&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java Mon May  9 11:56:33 2016
@@ -19,9 +19,11 @@ package org.apache.jackrabbit.oak.api.jm
 
 import javax.management.openmbean.CompositeData;
 
+import aQute.bnd.annotation.ProviderType;
 import org.apache.jackrabbit.oak.commons.jmx.Description;
 import org.apache.jackrabbit.oak.commons.jmx.Name;
 
+@ProviderType
 public interface IndexStatsMBean {
 
     String TYPE = "IndexStats";
@@ -32,6 +34,8 @@ public interface IndexStatsMBean {
 
     String STATUS_DONE = "done";
 
+    String STATUS_INTERRUPTED = "interrupted";
+
     /**
      * @return The time the indexing job stared at, or {@code ""} if it is
      *         not currently running.
@@ -70,6 +74,10 @@ public interface IndexStatsMBean {
      */
     void pause();
 
+    @Description("Aborts any running indexing cycle and pauses the indexer. Invoke 'resume' once you are read to " +
+            "resume indexing again")
+    String abortAndPause();
+
     /**
      * Resumes the indexing process. All changes from the previous indexed state
      * will be indexed.

Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java?rev=1742930&r1=1742929&r2=1742930&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java Mon May  9 11:56:33 2016
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-@Version("3.2.0")
+@Version("3.3.0")
 @Export(optional = "provide:=true")
 package org.apache.jackrabbit.oak.api.jmx;
 

Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1742930&r1=1742929&r2=1742930&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Mon May  9 11:56:33 2016
@@ -296,6 +296,7 @@ public class AsyncIndexUpdate implements
         @Override
         public void indexUpdate() throws CommitFailedException {
             if (forcedStop.get()){
+                forcedStop.set(false);
                 throw INTERRUPTED;
             }
 
@@ -364,6 +365,7 @@ public class AsyncIndexUpdate implements
 
     private void runWhenPermitted() {
         if (indexStats.isPaused()) {
+            log.debug("[{}] Ignoring the run as indexing is paused", name);
             return;
         }
         log.debug("[{}] Running background index task", name);
@@ -701,6 +703,7 @@ public class AsyncIndexUpdate implements
 
         public void failed(Exception e) {
             if (e == INTERRUPTED){
+                status = STATUS_INTERRUPTED;
                 log.info("[{}] The index update interrupted", name);
                 log.debug("[{}] The index update interrupted", name, e);
                 return;
@@ -770,9 +773,26 @@ public class AsyncIndexUpdate implements
         }
 
         @Override
+        public String abortAndPause() {
+            //First pause to avoid any race
+            pause();
+            //Set the forcedStop flag anyway. In resume this would be cleared
+            forcedStopFlag.set(true);
+            String msg = "";
+            //Abort if any indexing run is in progress
+            if (runPermit.availablePermits() == 0){
+                msg = "Abort request placed for current run. ";
+            }
+            return msg + "Indexing is paused now. Invoke 'resume' to resume indexing";
+        }
+
+        @Override
         public void resume() {
             log.debug("[{}] Resuming the async indexer", name);
             this.isPaused = false;
+
+            //Clear the forcedStop flag as fail safe
+            forcedStopFlag.set(false);
         }
 
         @Override

Modified: jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1742930&r1=1742929&r2=1742930&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Mon May  9 11:56:33 2016
@@ -38,6 +38,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -50,6 +51,7 @@ import javax.management.openmbean.Compos
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
 import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTaskSpliter;
@@ -1194,6 +1196,88 @@ public class AsyncIndexUpdateTest {
         lc.finished();
     }
 
+    @Test
+    public void abortedRun() throws Exception{
+        NodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        final Semaphore asyncLock = new Semaphore(1);
+        final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
+            @Override
+            protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
+                                                                 String beforeCheckpoint, String afterCheckpoint,
+                                                                 AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
+                return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+                        indexStats, stopFlag){
+
+                    @Override
+                    public void indexUpdate() throws CommitFailedException {
+                        try {
+                            asyncLock.acquire();
+                        } catch (InterruptedException ignore) {
+                        }
+                        try {
+                            super.indexUpdate();
+                        }finally {
+                            asyncLock.release();
+                        }
+                    }
+                };
+            }
+        };
+
+        runOneCycle(async);
+        assertEquals(IndexStatsMBean.STATUS_DONE, async.getIndexStats().getStatus());
+
+        //Below we ensure that we interrupt while the indexing is in progress
+        //hence the use of asyncLock which ensures the abort is called at right time
+
+        //Now make some changes to
+        builder = store.getRoot().builder();
+        builder.child("testRoot2").setProperty("foo", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        Thread t = new Thread(async);
+        //Lock to ensure that AsyncIndexUpdate waits
+        asyncLock.acquire();
+
+        t.start();
+
+        //Wait till async gets to wait state i.e. inside run
+        while(!asyncLock.hasQueuedThreads());
+
+        assertEquals(IndexStatsMBean.STATUS_RUNNING, async.getIndexStats().getStatus());
+        assertThat(async.getIndexStats().abortAndPause(), containsString("Abort request placed"));
+
+        asyncLock.release();
+
+        retry(5, 5, new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                return IndexStatsMBean.STATUS_INTERRUPTED.equals(async.getIndexStats().getStatus());
+            }
+        });
+
+        //Post abort indexing should be fine
+        runOneCycle(async);
+        assertTrue(async.getIndexStats().isPaused());
+
+        //Now resume indexing
+        async.getIndexStats().resume();
+
+        runOneCycle(async);
+        assertEquals(IndexStatsMBean.STATUS_DONE, async.getIndexStats().getStatus());
+        assertFalse(async.isClosed());
+    }
+
+
 
     private void assertLogPhrase(List<String> logs, String logPhrase){
         assertThat(logs.toString(), containsString(logPhrase));
@@ -1208,4 +1292,23 @@ public class AsyncIndexUpdateTest {
         return lc;
     }
 
+    private static void retry(int timeoutSeconds, int intervalBetweenTriesMsec, Callable<Boolean> c) {
+        long timeout = System.currentTimeMillis() + timeoutSeconds * 1000L;
+        while (System.currentTimeMillis() < timeout) {
+            try {
+                if (c.call()) {
+                    return;
+                }
+            } catch (Exception ignore) {
+            }
+
+            try {
+                Thread.sleep(intervalBetweenTriesMsec);
+            } catch (InterruptedException ignore) {
+            }
+        }
+
+        fail("RetryLoop failed, condition is false after " + timeoutSeconds + " seconds: ");
+    }
+
 }