You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2016/02/09 07:04:06 UTC

svn commit: r1729314 - in /jackrabbit/oak/branches/1.2: ./ oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/ oak-core/src/test/java/org/apache/jackrabbit/oak/ oak-core/src/test/java/org/ap...

Author: chetanm
Date: Tue Feb  9 06:04:05 2016
New Revision: 1729314

URL: http://svn.apache.org/viewvc?rev=1729314&view=rev
Log:
OAK-3923 - Async indexing delayed by 30 minutes because stop order is incorrect

Merging 1727508, 1727893

Modified:
    jackrabbit/oak/branches/1.2/   (props changed)
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/OakTest.java
    jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
    jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  9 06:04:05 2016
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673713,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678124,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680172,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1681921,1681955,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618
 ,1684669,1684820,1684868,1684894,1685023,1685075,1685370,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686003,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689831,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691139,1691151,1691159,1691167,1691183,1691188,1691210,1691280,1691307,1691331-1691333,1691345,1691384-1691385,1691401,1691509,1692133-1692134,1692156,1692250,1692274,1692363,1692382,1692478,1692955,1693002,1693030,1693050,1693209,1693401,1693421,1693525-1693526,1694007,1694393-1694394,1694651,1694653-1694654,1695032,1695050,1695122,1695280,1695299,1695420,1695457,1695482,1695492,1695
 507,1695521,1695540,1695571,1695905,1696190,1696194,1696242,1696285,1696375,1696522,1696578,1696759,1696916,1697363,1697373,1697410,1697582,1697589,1697616,1697672,1700191,1700231,1700397,1700403,1700506,1700571,1700718,1700727,1700749,1700769,1700775,1701065,1701619,1701733,1701743,1701750,1701768,1701806,1701810,1701814,1701907,1701948,1701955,1701959,1701965,1701986,1702014,1702022,1702045,1702051,1702241,1702272,1702371,1702387,1702405,1702423,1702426,1702428,1702860,1702866,1702942,1702960,1703212,1703382,1703395,1703411,1703428,1703430,1703568,1703592,1703758,1703858,1703878,1704256,1704282,1704285,1704457,1704479,1704490,1704614,1704629,1704636,1704655,1704670,1704886,1705005,1705027,1705043,1705055,1705250,1705268,1705273,1705323,1705677,1705701,1705871,1705992,1705998,1706009,1706037,1706059,1706212,1706218,1706270,1706764,1706772,1707049,1707191,1707331,1707435,1707509,1708105,1708315,1708546,1708592,1708766,1709012,1709852,1709978,1710013,1710031,1710049,1710205,1710242,1
 710559,1710575,1710590,1710614,1710637,1710789,1710800,1710811,1710816,1710972,1711248,1711282,1711296,1711405,1711498,1711654,1712018,1712042,1712319,1712490,1712531,1712730,1712785,1712963,1713008,1713439,1713461,1713580,1713586,1713599-1713600,1713626,1713698,1713803,1713809,1714034,1714061,1714084,1714170,1714213,1714229,1714238,1714519-1714520,1714543-1714544,1714730,1714739,1714779,1714956,1714961,1715010,1715092,1715191,1715346,1715767,1715771,1715888,1715898,1716178,1716426,1716576,1716596,1716616,1716703,1716712,1716815,1716823,1716830,1716883,1717277,1717462,1717632,1717784,1717789,1717988,1718528,1718533,1718547-1718548,1718626,1718646,1718772,1718801-1718802,1718895,1719111,1719288,1719869,1720335,1720350,1720354,1720500,1721160,1721172,1721337,1722141,1722832,1723227,1723239,1723241,1723251,1723254,1723333,1723347,1723350,1723584,1723713,1723731,1724026,1724057,1724186,1724210,1724401,1724628,1724631,1725216,1725477,1725555,1725960,1726232,1726570,1726579,1726585-172658
 6,1726621,1726795,1726797,1726809,1726812,1726981,1726993,1727026,1727254,1727331,1727350,1727358,1727429,1727476,1727515-1727518,1728037,1728041,1728070,1728114,1728281,1728443,1728642,1729200
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673713,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678124,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680172,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1681921,1681955,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618
 ,1684669,1684820,1684868,1684894,1685023,1685075,1685370,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686003,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689831,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691139,1691151,1691159,1691167,1691183,1691188,1691210,1691280,1691307,1691331-1691333,1691345,1691384-1691385,1691401,1691509,1692133-1692134,1692156,1692250,1692274,1692363,1692382,1692478,1692955,1693002,1693030,1693050,1693209,1693401,1693421,1693525-1693526,1694007,1694393-1694394,1694651,1694653-1694654,1695032,1695050,1695122,1695280,1695299,1695420,1695457,1695482,1695492,1695
 507,1695521,1695540,1695571,1695905,1696190,1696194,1696242,1696285,1696375,1696522,1696578,1696759,1696916,1697363,1697373,1697410,1697582,1697589,1697616,1697672,1700191,1700231,1700397,1700403,1700506,1700571,1700718,1700727,1700749,1700769,1700775,1701065,1701619,1701733,1701743,1701750,1701768,1701806,1701810,1701814,1701907,1701948,1701955,1701959,1701965,1701986,1702014,1702022,1702045,1702051,1702241,1702272,1702371,1702387,1702405,1702423,1702426,1702428,1702860,1702866,1702942,1702960,1703212,1703382,1703395,1703411,1703428,1703430,1703568,1703592,1703758,1703858,1703878,1704256,1704282,1704285,1704457,1704479,1704490,1704614,1704629,1704636,1704655,1704670,1704886,1705005,1705027,1705043,1705055,1705250,1705268,1705273,1705323,1705677,1705701,1705871,1705992,1705998,1706009,1706037,1706059,1706212,1706218,1706270,1706764,1706772,1707049,1707191,1707331,1707435,1707509,1708105,1708315,1708546,1708592,1708766,1709012,1709852,1709978,1710013,1710031,1710049,1710205,1710242,1
 710559,1710575,1710590,1710614,1710637,1710789,1710800,1710811,1710816,1710972,1711248,1711282,1711296,1711405,1711498,1711654,1712018,1712042,1712319,1712490,1712531,1712730,1712785,1712963,1713008,1713439,1713461,1713580,1713586,1713599-1713600,1713626,1713698,1713803,1713809,1714034,1714061,1714084,1714170,1714213,1714229,1714238,1714519-1714520,1714543-1714544,1714730,1714739,1714779,1714956,1714961,1715010,1715092,1715191,1715346,1715767,1715771,1715888,1715898,1716178,1716426,1716576,1716596,1716616,1716703,1716712,1716815,1716823,1716830,1716883,1717277,1717462,1717632,1717784,1717789,1717988,1718528,1718533,1718547-1718548,1718626,1718646,1718772,1718801-1718802,1718895,1719111,1719288,1719869,1720335,1720350,1720354,1720500,1721160,1721172,1721337,1722141,1722832,1723227,1723239,1723241,1723251,1723254,1723333,1723347,1723350,1723584,1723713,1723731,1724026,1724057,1724186,1724210,1724401,1724628,1724631,1725216,1725477,1725555,1725960,1726232,1726570,1726579,1726585-172658
 6,1726621,1726795,1726797,1726809,1726812,1726981,1726993,1727026,1727254,1727331,1727350,1727358,1727429,1727476,1727508,1727515-1727518,1727893,1728037,1728041,1728070,1728114,1728281,1728443,1728642,1729200
 /jackrabbit/trunk:1345480

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1729314&r1=1729313&r2=1729314&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Tue Feb  9 06:04:05 2016
@@ -140,6 +140,8 @@ public class Oak {
 
     private Executor executor;
 
+    private final Closer closer = Closer.create();
+
     /**
      * Default {@code ScheduledExecutorService} used for scheduling background tasks.
      * This default spawns up to 32 background thread on an as need basis. Idle
@@ -301,7 +303,7 @@ public class Oak {
     /**
      * Flag controlling the asynchronous indexing behavior. If false (default)
      * there will be no background indexing happening.
-     * 
+     *
      */
     private boolean asyncIndexing = false;
 
@@ -537,7 +539,7 @@ public class Oak {
             // Register AsyncIndexStats for execution stats update
             regs.add(
                 scheduleWithFixedDelay(whiteboard, task.getIndexStats(), 1, false));
-
+            closer.register(task);
             PropertyIndexAsyncReindex asyncPI = new PropertyIndexAsyncReindex(
                     new AsyncIndexUpdate(IndexConstants.ASYNC_REINDEX_VALUE,
                             store, indexEditors, true), getExecutor());
@@ -545,7 +547,7 @@ public class Oak {
                     PropertyIndexAsyncReindexMBean.class, asyncPI,
                     PropertyIndexAsyncReindexMBean.TYPE, name));
         }
-        
+
         regs.add(registerMBean(whiteboard, NodeCounterMBean.class,
                 new NodeCounter(store), NodeCounterMBean.TYPE, "nodeCounter"));
 
@@ -590,6 +592,7 @@ public class Oak {
                 super.close();
                 repoStateCheckHook.close();
                 new CompositeRegistration(regs).unregister();
+                closer.close();
             }
         };
     }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1729314&r1=1729313&r2=1729314&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Tue Feb  9 06:04:05 2016
@@ -27,10 +27,13 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
 
+import java.io.Closeable;
 import java.util.Calendar;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -73,7 +76,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 
-public class AsyncIndexUpdate implements Runnable {
+public class AsyncIndexUpdate implements Runnable, Closeable {
 
     private static final Logger log = LoggerFactory
             .getLogger(AsyncIndexUpdate.class);
@@ -89,6 +92,9 @@ public class AsyncIndexUpdate implements
     private static final CommitFailedException CONCURRENT_UPDATE = new CommitFailedException(
             "Async", 1, "Concurrent update detected");
 
+    private static final CommitFailedException INTERRUPTED = new CommitFailedException(
+            "Async", 1, "Indexing stopped forcefully");
+
     /**
      * Timeout in milliseconds after which an async job would be considered as
      * timed out. Another node in cluster would wait for timeout before
@@ -136,6 +142,16 @@ public class AsyncIndexUpdate implements
 
     private final MissingIndexProviderStrategy missingStrategy = new DefaultMissingIndexProviderStrategy();
 
+    private final Semaphore runPermit = new Semaphore(1);
+
+    /**
+     * Flag which would be set to true if the close operation is not
+     * able to close within specific time. The flag would be an
+     * indication to indexing thread to return straightway say by
+     * throwing an exception
+     */
+    private final AtomicBoolean forcedStopFlag = new AtomicBoolean();
+
     private long leaseTimeOut;
 
     /**
@@ -146,6 +162,14 @@ public class AsyncIndexUpdate implements
     private static long ERROR_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer
             .getInteger("oak.async.warn.interval", 30));
 
+    /**
+     * Timeout in seconds for which close call would wait before forcefully
+     * stopping the indexing thread
+     */
+    private int softTimeOutSecs = Integer.getInteger("oak.async.softTimeOutSecs", 2 * 60);
+
+    private boolean closed;
+
     public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
             @Nonnull IndexEditorProvider provider, boolean switchOnSync) {
         this.name = checkNotNull(name);
@@ -189,14 +213,17 @@ public class AsyncIndexUpdate implements
 
         private final AsyncIndexStats indexStats;
 
+        private final AtomicBoolean forcedStop;
+
         /** Expiration time of the last lease we committed */
         private long lease;
 
         public AsyncUpdateCallback(NodeStore store, String name,
                 long leaseTimeOut, String checkpoint, String afterCheckpoint,
-                AsyncIndexStats indexStats) {
+                AsyncIndexStats indexStats, AtomicBoolean forcedStop) {
             this.store = store;
             this.name = name;
+            this.forcedStop = forcedStop;
             this.leaseTimeOut = leaseTimeOut;
             this.checkpoint = checkpoint;
             this.afterCheckpoint = afterCheckpoint;
@@ -271,6 +298,10 @@ public class AsyncIndexUpdate implements
 
         @Override
         public void indexUpdate() throws CommitFailedException {
+            if (forcedStop.get()){
+                throw INTERRUPTED;
+            }
+
             if (indexStats.incUpdates() % 100 == 0) {
                 long now = System.currentTimeMillis();
                 if (now + leaseTimeOut > lease) {
@@ -286,6 +317,55 @@ public class AsyncIndexUpdate implements
 
     @Override
     public synchronized void run() {
+        boolean permitAcquired = false;
+        try{
+            if (runPermit.tryAcquire()){
+                permitAcquired = true;
+                runWhenPermitted();
+            } else {
+                log.warn("[{}] Could not acquire run permit. Stop flag set to [{}] Skipping the run", name, forcedStopFlag);
+            }
+        } finally {
+            if (permitAcquired){
+                runPermit.release();
+            }
+        }
+    }
+
+
+    @Override
+    public void close() {
+        if (closed) {
+            return;
+        }
+        int hardTimeOut = 5 * softTimeOutSecs;
+        if(!runPermit.tryAcquire()){
+            //First let current run complete without bothering it
+            log.debug("[{}] [WAITING] Indexing in progress. Would wait for {} secs for it to finish", name, softTimeOutSecs);
+            try {
+                if(!runPermit.tryAcquire(softTimeOutSecs, TimeUnit.SECONDS)){
+                    //We have now waited enough. So signal the indexer that it should return right away
+                    //as soon as it sees the forcedStopFlag
+                    log.debug("[{}] [SOFT LIMIT HIT] Indexing found to be in progress for more than [{}]s. Would " +
+                            "signal it to now force stop", name, softTimeOutSecs);
+                    forcedStopFlag.set(true);
+                    if(!runPermit.tryAcquire(hardTimeOut, TimeUnit.SECONDS)){
+                        //Index thread did not listened to our advice. So give up now and warn about it
+                        log.warn("[{}] Indexing still not found to be complete. Giving up after [{}]s", name, hardTimeOut);
+                    }
+                } else {
+                    log.info("[{}] [CLOSED OK] Async indexing run completed. Closing it now", name);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        } else {
+            log.info("[{}] Closed", name);
+        }
+        closed = true;
+    }
+
+    private void runWhenPermitted() {
         if (indexStats.isPaused()) {
             return;
         }
@@ -382,10 +462,11 @@ public class AsyncIndexUpdate implements
     }
 
     protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
-            String name, long leaseTimeOut, String beforeCheckpoint,
-            String afterCheckpoint, AsyncIndexStats indexStats) {
+                                                         String name, long leaseTimeOut, String beforeCheckpoint,
+                                                         String afterCheckpoint, AsyncIndexStats indexStats,
+                                                         AtomicBoolean stopFlag) {
         return new AsyncUpdateCallback(store, name, leaseTimeOut,
-                beforeCheckpoint, afterCheckpoint, indexStats);
+                beforeCheckpoint, afterCheckpoint, indexStats, stopFlag);
     }
 
     private boolean updateIndex(NodeState before, String beforeCheckpoint,
@@ -397,7 +478,7 @@ public class AsyncIndexUpdate implements
         // create an update callback for tracking index updates
         // and maintaining the update lease
         AsyncUpdateCallback callback = newAsyncUpdateCallback(store, name,
-                leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats);
+                leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats, forcedStopFlag);
         callback.prepare();
         try {
             NodeBuilder builder = store.getRoot().builder();
@@ -515,6 +596,19 @@ public class AsyncIndexUpdate implements
         return this;
     }
 
+    protected AsyncIndexUpdate setCloseTimeOut(int timeOutInSec) {
+        this.softTimeOutSecs = timeOutInSec;
+        return this;
+    }
+
+    public boolean isClosed(){
+        return closed || forcedStopFlag.get();
+    }
+
+    boolean isClosing(){
+        return runPermit.hasQueuedThreads();
+    }
+
     private static void preAsyncRunStatsStats(AsyncIndexStats stats) {
         stats.start(now());
     }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/OakTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/OakTest.java?rev=1729314&r1=1729313&r2=1729314&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/OakTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/OakTest.java Tue Feb  9 06:04:05 2016
@@ -16,14 +16,26 @@
  */
 package org.apache.jackrabbit.oak;
 
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
 import javax.jcr.NoSuchWorkspaceException;
 
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
 import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
+import org.apache.jackrabbit.oak.spi.whiteboard.DefaultWhiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -65,4 +77,28 @@ public class OakTest {
 
     }
 
+    @Test
+    public void closeAsyncIndexers() throws Exception{
+        final AtomicReference<AsyncIndexUpdate> async = new AtomicReference<AsyncIndexUpdate>();
+        Whiteboard wb = new DefaultWhiteboard(){
+            @Override
+            public <T> Registration register(Class<T> type, T service, Map<?, ?> properties) {
+                if (service instanceof AsyncIndexUpdate){
+                    async.set((AsyncIndexUpdate) service);
+                }
+                return super.register(type, service, properties);
+            }
+        };
+        Oak oak = new Oak()
+                .with(new OpenSecurityProvider())
+                .with(wb)
+                .withAsyncIndexing();
+        ContentRepository repo = oak.createContentRepository();
+
+        ((Closeable)repo).close();
+        assertNotNull(async.get());
+        assertTrue(async.get().isClosed());
+        assertNull(WhiteboardUtils.getService(wb, AsyncIndexUpdate.class));
+    }
+
 }
\ No newline at end of file

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java?rev=1729314&r1=1729313&r2=1729314&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java Tue Feb  9 06:04:05 2016
@@ -399,10 +399,11 @@ public class AsyncIndexUpdateLeaseTest e
 
         @Override
         protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
-                String name, long leaseTimeOut, String checkpoint,
-                String afterCheckpoint, AsyncIndexStats indexStats) {
+                                                             String name, long leaseTimeOut, String checkpoint,
+                                                             String afterCheckpoint, AsyncIndexStats indexStats,
+                                                             AtomicBoolean stopFlag) {
             return new SpecialAsyncUpdateCallback(store, name, leaseTimeOut,
-                    checkpoint, afterCheckpoint, indexStats, listener);
+                    checkpoint, afterCheckpoint, indexStats, stopFlag, listener);
         }
     }
 
@@ -411,10 +412,9 @@ public class AsyncIndexUpdateLeaseTest e
         private IndexStatusListener listener;
 
         public SpecialAsyncUpdateCallback(NodeStore store, String name,
-                long leaseTimeOut, String checkpoint, String afterCheckpoint,
-                AsyncIndexStats indexStats, IndexStatusListener listener) {
-            super(store, name, leaseTimeOut, checkpoint, afterCheckpoint,
-                    indexStats);
+                                          long leaseTimeOut, String checkpoint, String afterCheckpoint,
+                                          AsyncIndexStats indexStats, AtomicBoolean stopFlag, IndexStatusListener listener) {
+            super(store, name, leaseTimeOut, checkpoint, afterCheckpoint, indexStats, stopFlag);
             this.listener = listener;
         }
 

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1729314&r1=1729313&r2=1729314&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Tue Feb  9 06:04:05 2016
@@ -23,15 +23,19 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
 import static org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider.TYPE;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
@@ -903,5 +907,180 @@ public class AsyncIndexUpdateTest {
         customLogs.finished();
     }
 
+    @Test
+    public void noRunWhenClosed() throws Exception{
+        NodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+        AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+        async.run();
+
+        async.close();
+        LogCustomizer lc = createLogCustomizer(Level.WARN);
+        async.run();
+        assertEquals(1, lc.getLogs().size());
+        assertThat(lc.getLogs().get(0), containsString("Could not acquire run permit"));
+
+        lc.finished();
+
+        async.close();
+    }
+
+    @Test
+    public void closeWithSoftLimit() throws Exception{
+        NodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        final Semaphore asyncLock = new Semaphore(1);
+        final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
+            @Override
+            protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
+                                                                 String beforeCheckpoint, String afterCheckpoint,
+                                                                 AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
+                try {
+                    asyncLock.acquire();
+                } catch (InterruptedException ignore) {
+                }
+                return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+                        indexStats, stopFlag);
+            }
+        };
+
+        async.setCloseTimeOut(1000);
+
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                async.run();
+            }
+        });
+
+        Thread closer = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                async.close();
+            }
+        });
+
+        asyncLock.acquire();
+        t.start();
+
+        //Wait till async gets to wait state i.e. inside run
+        while(!asyncLock.hasQueuedThreads());
+
+        LogCustomizer lc = createLogCustomizer(Level.DEBUG);
+        closer.start();
+
+        //Wait till closer is in waiting state
+        while(!async.isClosing());
+
+        //For softLimit case the flag should not be set
+        assertFalse(async.isClosed());
+        assertLogPhrase(lc.getLogs(), "[WAITING]");
+
+        //Let indexing run complete now
+        asyncLock.release();
+
+        //Wait for both threads
+        t.join();
+        closer.join();
+
+        //Close call should complete
+        assertLogPhrase(lc.getLogs(), "[CLOSED OK]");
+    }
+
+    @Test
+    public void closeWithHardLimit() throws Exception{
+        NodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        final Semaphore asyncLock = new Semaphore(1);
+        final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
+            @Override
+            protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
+                                                                 String beforeCheckpoint, String afterCheckpoint,
+                                                                 AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
+                try {
+                    asyncLock.acquire();
+                } catch (InterruptedException ignore) {
+                }
+                return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+                        indexStats, stopFlag);
+            }
+        };
+
+        //Set a 1 sec close timeout
+        async.setCloseTimeOut(1);
+
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                async.run();
+            }
+        });
+
+        Thread closer = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                async.close();
+            }
+        });
+
+        //Lock to ensure that AsyncIndexUpdate waits
+        asyncLock.acquire();
+
+        t.start();
+
+        //Wait till async gets to wait state i.e. inside run
+        while(!asyncLock.hasQueuedThreads());
+
+        LogCustomizer lc = createLogCustomizer(Level.DEBUG);
+        closer.start();
+
+        //Wait till stopFlag is set
+        while(!async.isClosed());
+
+        assertLogPhrase(lc.getLogs(), "[SOFT LIMIT HIT]");
+
+        //Let indexing run complete now
+        asyncLock.release();
+
+        //Wait for both threads
+        t.join();
+
+        //Async run would have failed with exception
+        assertNotNull(async.getIndexStats().getLatestError());
+
+        //Wait for close call to complete
+        closer.join();
+    }
+
+
+    private void assertLogPhrase(List<String> logs, String logPhrase){
+        assertThat(logs.toString(), containsString(logPhrase));
+    }
+
+    private static LogCustomizer createLogCustomizer(Level level){
+        LogCustomizer lc = LogCustomizer.forLogger(AsyncIndexUpdate.class.getName())
+                .filter(level)
+                .enable(level)
+                .create();
+        lc.starting();
+        return lc;
+    }
 
 }