You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2015/04/27 12:35:50 UTC

svn commit: r1676238 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/ main/java/org/apache/jackrabbit/oak/api/jmx/ main/java/org/apache/jackrabbit/oak/plugins/index/ test/java/org/apache/jackrabbit/oak/plugins/index/

Author: alexparvulescu
Date: Mon Apr 27 10:35:49 2015
New Revision: 1676238

URL: http://svn.apache.org/r1676238
Log:
OAK-2749 Provide a "different lane" for slow indexers in async indexing

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1676238&r1=1676237&r2=1676238&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Mon Apr 27 10:35:49 2015
@@ -21,14 +21,15 @@ import static com.google.common.base.Pre
 import static com.google.common.collect.Lists.newArrayList;
 import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
 import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerObserver;
-import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -48,7 +49,6 @@ import javax.management.ObjectName;
 import javax.security.auth.login.LoginException;
 
 import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
@@ -56,7 +56,6 @@ import com.google.common.io.Closer;
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.api.Root;
-import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.QueryEngineSettingsMBean;
 import org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean;
 import org.apache.jackrabbit.oak.core.ContentRepositoryImpl;
@@ -66,6 +65,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.CompositeIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.IndexMBeanRegistration;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
 import org.apache.jackrabbit.oak.plugins.index.counter.jmx.NodeCounter;
 import org.apache.jackrabbit.oak.plugins.index.counter.jmx.NodeCounterMBean;
@@ -308,11 +308,11 @@ public class Oak {
     };
 
     /**
-     * Flag controlling the asynchronous indexing behavior. If false (default)
-     * there will be no background indexing happening.
-     * 
+     * Map containing the (names -> delayInSecods) of the background indexing
+     * tasks that need to be started with this repository. A {@code null} value
+     * means no background tasks will run.
      */
-    private boolean asyncIndexing = false;
+    private Map<String, Long> asyncTasks;
 
     public Oak(NodeStore store) {
         this.store = checkNotNull(store);
@@ -504,15 +504,38 @@ public class Oak {
     }
 
     /**
+     * <p>
      * Enable the asynchronous (background) indexing behavior.
-     *
-     * Please not that when enabling the background indexer, you need to take
+     * </p>
+     * <p>
+     * Please note that when enabling the background indexer, you need to take
      * care of calling
      * <code>#shutdown<code> on the <code>executor<code> provided for this Oak instance.
-     *
+     * </p>
+     * @deprecated Use {@link Oak#withAsyncIndexing(String, long)} instead
      */
+    @Deprecated
     public Oak withAsyncIndexing() {
-        this.asyncIndexing = true;
+        return withAsyncIndexing("async", 5);
+    }
+
+    /**
+     * <p>
+     * Enable the asynchronous (background) indexing behavior for the provided
+     * task name.
+     * </p>
+     * <p>
+     * Please note that when enabling the background indexer, you need to take
+     * care of calling
+     * <code>#shutdown<code> on the <code>executor<code> provided for this Oak instance.
+     * </p>
+     */
+    public Oak withAsyncIndexing(@Nonnull String name, long delayInSeconds) {
+        if (this.asyncTasks == null) {
+            asyncTasks = new HashMap<String, Long>();
+        }
+        checkState(delayInSeconds > 0, "delayInSeconds value must be > 0");
+        asyncTasks.put(checkNotNull(name), delayInSeconds);
         return this;
     }
 
@@ -537,25 +560,25 @@ public class Oak {
         initHooks.add(new EditorHook(CompositeEditorProvider
                 .compose(editorProviders)));
 
-        if (asyncIndexing) {
-            String name = "async";
-            AsyncIndexUpdate task = new AsyncIndexUpdate(name, store,
-                    indexEditors);
-            regs.add(scheduleWithFixedDelay(whiteboard, task, 5, true));
-            regs.add(registerMBean(whiteboard, IndexStatsMBean.class,
-                    task.getIndexStats(), IndexStatsMBean.TYPE, name));
-            // Register AsyncIndexStats for execution stats update
-            regs.add(
-                scheduleWithFixedDelay(whiteboard, task.getIndexStats(), 1, false));
+        if (asyncTasks != null) {
+            IndexMBeanRegistration indexRegistration = new IndexMBeanRegistration(
+                    whiteboard);
+            regs.add(indexRegistration);
+            for (Entry<String, Long> t : asyncTasks.entrySet()) {
+                AsyncIndexUpdate task = new AsyncIndexUpdate(t.getKey(), store,
+                        indexEditors);
+                indexRegistration.registerAsyncIndexer(task, t.getValue());
+            }
 
+            // TODO verify how this fits in with OAK-2749
             PropertyIndexAsyncReindex asyncPI = new PropertyIndexAsyncReindex(
                     new AsyncIndexUpdate(IndexConstants.ASYNC_REINDEX_VALUE,
                             store, indexEditors, true), getExecutor());
             regs.add(registerMBean(whiteboard,
                     PropertyIndexAsyncReindexMBean.class, asyncPI,
-                    PropertyIndexAsyncReindexMBean.TYPE, name));
+                    PropertyIndexAsyncReindexMBean.TYPE, "async"));
         }
-        
+
         regs.add(registerMBean(whiteboard, NodeCounterMBean.class,
                 new NodeCounter(store), NodeCounterMBean.TYPE, "nodeCounter"));
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1676238&r1=1676237&r2=1676238&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java Mon Apr 27 10:35:49 2015
@@ -19,6 +19,9 @@ package org.apache.jackrabbit.oak.api.jm
 
 import javax.management.openmbean.CompositeData;
 
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+import org.apache.jackrabbit.oak.commons.jmx.Name;
+
 public interface IndexStatsMBean {
 
     String TYPE = "IndexStats";
@@ -140,4 +143,23 @@ public interface IndexStatsMBean {
      * Resets the consolidated stats.
      */
     void resetConsolidatedExecutionStats();
+
+    /**
+     * Splits the current indexing tasks into 2, indexes that are passed in as
+     * an input will have their 'async' property updated to
+     * {@code newIndexTaskName}.
+     * 
+     * Note that this call will *not* bootstrap a new indexing task for the
+     * given name.
+     */
+    void splitIndexingTask(
+            @Name("paths") @Description("Comma separated list of paths of the index definitions") String paths,
+            @Name("newIndexTaskName") @Description("The indexing task name set on the async property") String newIndexTaskName);
+
+    /**
+     * Starts a new background indexing task and registers the JMX MBeans for it
+     * 
+     */
+    void registerAsyncIndexer(@Name("name") String name,
+            @Name("delayInSeconds") long delayInSeconds);
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1676238&r1=1676237&r2=1676238&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Mon Apr 27 10:35:49 2015
@@ -19,6 +19,7 @@
 package org.apache.jackrabbit.oak.plugins.index;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Sets.newHashSet;
 import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE;
 import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
@@ -32,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
@@ -41,13 +43,15 @@ import javax.management.openmbean.OpenTy
 import javax.management.openmbean.SimpleType;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Splitter;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
+
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
 import org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler;
 import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
 import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
@@ -135,10 +139,20 @@ public class AsyncIndexUpdate implements
 
     private final MissingIndexProviderStrategy missingStrategy = new DefaultMissingIndexProviderStrategy();
 
+    /**
+     * Property name which stores the temporary checkpoint that need to be released on the next run
+     */
+    private final String tempCpName;
+
+    private final IndexTaskSpliter taskSplitter = new IndexTaskSpliter();
+
+    private IndexMBeanRegistration mbeanRegistration;
+
     public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
             @Nonnull IndexEditorProvider provider, boolean switchOnSync) {
         this.name = checkNotNull(name);
         this.lastIndexedTo = name + "-LastIndexedTo";
+        this.tempCpName = name + "-temp";
         this.store = checkNotNull(store);
         this.provider = checkNotNull(provider);
         this.switchOnSync = switchOnSync;
@@ -166,7 +180,6 @@ public class AsyncIndexUpdate implements
         private long updates = 0;
 
         private final String leaseName;
-        private final String tempCpName;
 
         public AsyncUpdateCallback(String checkpoint, String afterCheckpoint)
                 throws CommitFailedException {
@@ -174,7 +187,6 @@ public class AsyncIndexUpdate implements
             this.checkpoint = checkpoint;
             this.lease = now + 2 * ASYNC_TIMEOUT;
             this.leaseName = name + "-lease";
-            this.tempCpName = name + "-temp";
 
             NodeState root = store.getRoot();
             long beforeLease = root.getChildNode(ASYNC).getLong(leaseName);
@@ -199,14 +211,14 @@ public class AsyncIndexUpdate implements
             indexStats.setProcessedCheckpoint(afterCheckpoint);
 
             // try to drop temp cps, add 'currentCp' to the temp cps list
-            Set<String> temps = Sets.newHashSet();
+            Set<String> temps = newHashSet();
             for (String cp : getStrings(async, tempCpName)) {
                 if (cp.equals(checkpoint)) {
                     temps.add(cp);
                     continue;
                 }
                 boolean released = store.release(cp);
-                log.debug("Releasing temporary checkpoint {}: {}", cp, released);
+                log.debug("[{}] Releasing temporary checkpoint {}: {}", name, cp, released);
                 if (!released) {
                     temps.add(cp);
                 }
@@ -216,14 +228,6 @@ public class AsyncIndexUpdate implements
             indexStats.setTempCheckpoints(temps);
         }
 
-        private Iterable<String> getStrings(NodeBuilder b, String p) {
-            PropertyState ps = b.getProperty(p);
-            if (ps != null) {
-                return ps.getValue(Type.STRINGS);
-            }
-            return Sets.newHashSet();
-        }
-
         boolean isDirty() {
             return updates > 0;
         }
@@ -258,7 +262,7 @@ public class AsyncIndexUpdate implements
         if (indexStats.isPaused()) {
             return;
         }
-        log.debug("Running background index task {}", name);
+        log.debug("[{}] Running background index task", name);
 
         NodeState root = store.getRoot();
 
@@ -267,8 +271,9 @@ public class AsyncIndexUpdate implements
         long leaseEndTime = async.getLong(name + "-lease");
         long currentTime = System.currentTimeMillis();
         if (leaseEndTime > currentTime) {
-            log.debug("Another copy of the {} index update is already running;"
-                    + " skipping this update. Time left for lease to expire {}s", name, (leaseEndTime - currentTime)/1000);
+            log.debug(
+                    "[{}] Another copy of the index update is already running; skipping this update. Time left for lease to expire {}s",
+                    name, (leaseEndTime - currentTime) / 1000);
             return;
         }
 
@@ -278,20 +283,21 @@ public class AsyncIndexUpdate implements
         if (beforeCheckpoint != null) {
             NodeState state = store.retrieve(beforeCheckpoint);
             if (state == null) {
-                log.warn("Failed to retrieve previously indexed checkpoint {};"
-                        + " re-running the initial {} index update",
-                        beforeCheckpoint, name);
+                log.warn(
+                        "[{}] Failed to retrieve previously indexed checkpoint {}; re-running the initial index update",
+                        name, beforeCheckpoint);
                 beforeCheckpoint = null;
                 before = MISSING_NODE;
             } else if (noVisibleChanges(state, root)) {
-                log.debug("No changes since last checkpoint;"
-                        + " skipping the {} index update", name);
+                log.debug(
+                        "[{}] No changes since last checkpoint; skipping the index update",
+                        name);
                 return;
             } else {
                 before = state;
             }
         } else {
-            log.info("Initial {} index update", name);
+            log.info("[{}] Initial index update", name);
             before = MISSING_NODE;
         }
 
@@ -299,11 +305,13 @@ public class AsyncIndexUpdate implements
         String afterTime = now();
         String afterCheckpoint = store.checkpoint(lifetime, ImmutableMap.of(
                 "creator", AsyncIndexUpdate.class.getSimpleName(),
-                "thread", Thread.currentThread().getName()));
+                "thread", Thread.currentThread().getName(),
+                "name", name));
         NodeState after = store.retrieve(afterCheckpoint);
         if (after == null) {
-            log.debug("Unable to retrieve newly created checkpoint {},"
-                    + " skipping the {} index update", afterCheckpoint, name);
+            log.debug(
+                    "[{}] Unable to retrieve newly created checkpoint {}, skipping the index update",
+                    name, afterCheckpoint);
             return;
         }
 
@@ -313,7 +321,7 @@ public class AsyncIndexUpdate implements
 
             // the update succeeded, i.e. it no longer fails
             if (failing) {
-                log.info("Index update {} no longer fails", name);
+                log.info("[{}] Index update no longer fails", name);
                 failing = false;
             }
 
@@ -327,18 +335,22 @@ public class AsyncIndexUpdate implements
 
         } catch (CommitFailedException e) {
             if (e == CONCURRENT_UPDATE) {
-                log.debug("Concurrent update detected in the {} index update", name);
+                log.debug("[{}] Concurrent update detected in the index update", name);
             } else if (failing) {
-                log.debug("The {} index update is still failing", name, e);
+                log.debug("[{}] The index update is still failing", name, e);
             } else {
-                log.warn("The {} index update failed", name, e);
+                log.warn("[{}] The index update failed", name, e);
                 failing = true;
             }
 
         } finally {
-            if (checkpointToRelease != null) { // null during initial indexing
+            // null during initial indexing
+            // and skip release if this cp was used in a split operation
+            if (checkpointToRelease != null
+                    && !checkpointToRelease.equals(taskSplitter
+                            .getLastReferencedCp())) {
                 if (!store.release(checkpointToRelease)) {
-                    log.debug("Unable to release checkpoint {}",
+                    log.debug("[{}] Unable to release checkpoint {}", name,
                             checkpointToRelease);
                 }
             }
@@ -358,6 +370,11 @@ public class AsyncIndexUpdate implements
         // and maintaining the update lease
         AsyncUpdateCallback callback =
                 new AsyncUpdateCallback(beforeCheckpoint, afterCheckpoint);
+
+        // check for index tasks split requests, if a split happened, make
+        // sure to not delete the reference checkpoint, as the other index
+        // task will take care of it
+        taskSplitter.maybeSplit(beforeCheckpoint, callback.lease);
         try {
             NodeBuilder builder = store.getRoot().builder();
 
@@ -384,8 +401,8 @@ public class AsyncIndexUpdate implements
             } else {
                 if (switchOnSync) {
                     log.debug(
-                            "No changes detected after diff; will try to switch to synchronous updates on {}",
-                            reindexedDefinitions);
+                            "[{}] No changes detected after diff; will try to switch to synchronous updates on {}",
+                            name, reindexedDefinitions);
 
                     // no changes after diff, switch to sync on the async defs
                     for (String path : reindexedDefinitions) {
@@ -406,7 +423,8 @@ public class AsyncIndexUpdate implements
                 postAsyncRunStatsStatus(indexStats);
             }
             if (indexUpdate.isReindexingPerformed()) {
-                log.info("Reindexing ({}) completed for indexes: {} in {}", name, indexUpdate.getReindexStats(), watch);
+                log.info("[{}] Reindexing completed for indexes: {} in {}",
+                        name, indexUpdate.getReindexStats(), watch);
                 progressLogged = true;
             }
         } finally {
@@ -414,7 +432,7 @@ public class AsyncIndexUpdate implements
         }
 
         if (!progressLogged) {
-            String msg = "AsyncIndex ({}) update run completed in {}. Indexed {} nodes";
+            String msg = "[{}] AsyncIndex update run completed in {}. Indexed {} nodes";
             //Log at info level if time taken is more than 5 min
             if (watch.elapsed(TimeUnit.MINUTES) >= 5) {
                 log.info(msg, name, watch, callback.updates);
@@ -469,7 +487,12 @@ public class AsyncIndexUpdate implements
         return indexStats.getStatus() == STATUS_DONE;
     }
 
-    final class AsyncIndexStats implements IndexStatsMBean, Runnable {
+    final class AsyncIndexStats extends AnnotatedStandardMBean implements
+            IndexStatsMBean, Runnable {
+
+        protected AsyncIndexStats() {
+            super(IndexStatsMBean.class);
+        }
 
         private String start = "";
         private String done = "";
@@ -528,13 +551,13 @@ public class AsyncIndexUpdate implements
 
         @Override
         public void pause() {
-            log.debug("Pausing the async indexer");
+            log.debug("[{}] Pausing the async indexer", name);
             this.isPaused = true;
         }
 
         @Override
         public void resume() {
-            log.debug("Resuming the async indexer");
+            log.debug("[{}] Resuming the async indexer", name);
             this.isPaused = false;
         }
 
@@ -640,7 +663,7 @@ public class AsyncIndexUpdate implements
                         names,
                         new OpenType[] {SimpleType.LONG, SimpleType.LONG, SimpleType.LONG});
                 } catch (OpenDataException e) {
-                    log.warn("Error in creating CompositeType for consolidated stats", e);
+                    log.warn("[{}] Error in creating CompositeType for consolidated stats", name, e);
                 }
             }
 
@@ -669,7 +692,7 @@ public class AsyncIndexUpdate implements
                         consolidatedExecTime.longValue(), consolidatedNodes.longValue()};
                     return new CompositeDataSupport(consolidatedType, names, values);
                 } catch (Exception e) {
-                    log.error("Error retrieving consolidated stats", e);
+                    log.error("[{}] Error retrieving consolidated stats", name, e);
                     return null;
                 }
             }
@@ -685,6 +708,22 @@ public class AsyncIndexUpdate implements
                 execTimer.recordOneSecond();
             }
         }
+
+        @Override
+        public void splitIndexingTask(String paths, String newIndexTaskName) {
+            splitIndexingTask(newHashSet(Splitter.on(",").trimResults()
+                    .omitEmptyStrings().split(paths)), newIndexTaskName);
+        }
+
+        private void splitIndexingTask(Set<String> paths,
+                String newIndexTaskName) {
+            taskSplitter.registerSplit(paths, newIndexTaskName);
+        }
+
+        @Override
+        public void registerAsyncIndexer(String name, long delayInSeconds) {
+            taskSplitter.registerAsyncIndexer(name, delayInSeconds);
+        }
     }
 
     /**
@@ -729,7 +768,7 @@ public class AsyncIndexUpdate implements
     static class DefaultMissingIndexProviderStrategy extends
             MissingIndexProviderStrategy {
 
-        private final Set<String> ignore = Sets.newHashSet("disabled");
+        private final Set<String> ignore = newHashSet("disabled");
 
         @Override
         public void onMissingIndex(String type, NodeBuilder definition)
@@ -746,4 +785,112 @@ public class AsyncIndexUpdate implements
         return failing;
     }
 
+    class IndexTaskSpliter {
+
+        private Set<String> paths = null;
+        private String newIndexTaskName = null;
+        private String lastReferencedCp;
+
+        private Set<String> registeredTasks = newHashSet();
+
+        void registerSplit(Set<String> paths, String newIndexTaskName) {
+            log.info(
+                    "[{}] Registered split of following index definitions {} to new async task {}.",
+                    name, paths, newIndexTaskName);
+            this.paths = newHashSet(paths);
+            this.newIndexTaskName = newIndexTaskName;
+        }
+
+        void maybeSplit(@CheckForNull String refCheckpoint, long lease)
+                throws CommitFailedException {
+            if (paths == null) {
+                return;
+            }
+            split(refCheckpoint, lease);
+        }
+
+        private void split(@CheckForNull String refCheckpoint, long lease) throws CommitFailedException {
+            NodeBuilder builder = store.getRoot().builder();
+            if (refCheckpoint != null) {
+                NodeBuilder async = builder.child(ASYNC);
+                // add new reference
+                async.setProperty(newIndexTaskName, refCheckpoint);
+                // update old 'temp' list: remove refcp so it doesn't get released on next run
+                Set<String> temps = newHashSet();
+                for (String cp : getStrings(async, tempCpName)) {
+                    if (cp.equals(refCheckpoint)) {
+                        continue;
+                    }
+                    temps.add(cp);
+                }
+                async.setProperty(tempCpName, temps, Type.STRINGS);
+                indexStats.setTempCheckpoints(temps);
+            }
+
+            // update index defs name => newIndexTaskName
+            Set<String> updated = newHashSet();
+            for (String path : paths) {
+                NodeBuilder c = builder;
+                for (String p : elements(path)) {
+                    c = c.getChildNode(p);
+                }
+                if (c.exists() && name.equals(c.getString("async"))) {
+                    c.setProperty("async", newIndexTaskName);
+                    updated.add(path);
+                }
+            }
+
+            if (!updated.isEmpty()) {
+                mergeWithConcurrencyCheck(builder, refCheckpoint, lease);
+                log.info(
+                        "[{}] Successfully split index definitions {} to async task named {} with referenced checkpoint {}.",
+                        name, updated, newIndexTaskName, refCheckpoint);
+                lastReferencedCp = refCheckpoint;
+            }
+            paths = null;
+            newIndexTaskName = null;
+        }
+
+        public String getLastReferencedCp() {
+            return lastReferencedCp;
+        }
+
+        void registerAsyncIndexer(String newTask, long delayInSeconds) {
+            if (registeredTasks.contains(newTask)) {
+                // prevent accidental double call
+                log.warn("[{}] Task {} is already registered.", name, newTask);
+                return;
+            }
+            if (mbeanRegistration != null) {
+                log.info(
+                        "[{}] Registering a new indexing task {} running each {} seconds.",
+                        name, newTask, delayInSeconds);
+                AsyncIndexUpdate task = new AsyncIndexUpdate(newTask, store,
+                        provider);
+                mbeanRegistration.registerAsyncIndexer(task, delayInSeconds);
+                registeredTasks.add(newTask);
+            }
+        }
+    }
+
+    private static Iterable<String> getStrings(NodeBuilder b, String p) {
+        PropertyState ps = b.getProperty(p);
+        if (ps != null) {
+            return ps.getValue(Type.STRINGS);
+        }
+        return newHashSet();
+    }
+
+    IndexTaskSpliter getTaskSplitter() {
+        return taskSplitter;
+    }
+
+    public void setIndexMBeanRegistration(IndexMBeanRegistration mbeanRegistration) {
+        this.mbeanRegistration = mbeanRegistration;
+    }
+
+    protected String getName() {
+        return name;
+    }
+
 }

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java?rev=1676238&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java Mon Apr 27 10:35:49 2015
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay;
+
+import java.util.List;
+
+import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+
+import com.google.common.collect.Lists;
+
+public class IndexMBeanRegistration implements Registration {
+
+    private final Whiteboard whiteboard;
+    private final List<Registration> regs = Lists.newArrayList();
+
+    public IndexMBeanRegistration(Whiteboard whiteboard) {
+        this.whiteboard = whiteboard;
+    }
+
+    public void registerAsyncIndexer(AsyncIndexUpdate task, long delayInSeconds) {
+        task.setIndexMBeanRegistration(this);
+        regs.add(scheduleWithFixedDelay(whiteboard, task, delayInSeconds, true));
+        regs.add(registerMBean(whiteboard, IndexStatsMBean.class,
+                task.getIndexStats(), IndexStatsMBean.TYPE, task.getName()));
+        // Register AsyncIndexStats for execution stats update
+        regs.add(scheduleWithFixedDelay(whiteboard, task.getIndexStats(), 1,
+                false));
+    }
+
+    @Override
+    public void unregister() {
+        new CompositeRegistration(regs).unregister();
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1676238&r1=1676237&r2=1676238&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Mon Apr 27 10:35:49 2015
@@ -25,6 +25,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider.TYPE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -41,6 +42,7 @@ import javax.annotation.Nullable;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTaskSpliter;
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
@@ -680,4 +682,126 @@ public class AsyncIndexUpdateTest {
 
     }
 
+    @Test
+    public void taskSplit() throws Exception {
+        MemoryNodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+        NodeBuilder builder = store.getRoot().builder();
+
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "changedIndex", true, false, ImmutableSet.of("bar"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "ignored1", true, false, ImmutableSet.of("baz"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async-ignored");
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "ignored2", true, false, ImmutableSet.of("etc"), null);
+
+        builder.child("testRoot").setProperty("foo", "abc");
+        builder.child("testRoot").setProperty("bar", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        assertTrue("Expecting no checkpoints",
+                store.listCheckpoints().size() == 0);
+
+        AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+        async.run();
+        assertTrue("Expecting one checkpoint",
+                store.listCheckpoints().size() == 1);
+        String firstCp = store.listCheckpoints().iterator().next();
+
+        builder = store.getRoot().builder();
+        builder.child("testRoot").setProperty("foo", "def");
+        builder.child("testRoot").setProperty("bar", "def");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        IndexTaskSpliter splitter = async.getTaskSplitter();
+        splitter.registerSplit(newHashSet("/oak:index/changedIndex"), "async-slow");
+
+        async.run();
+
+        Set<String> checkpoints = newHashSet(store.listCheckpoints());
+
+        assertTrue("Expecting two checkpoints",
+                checkpoints.size() == 2);
+        assertTrue(checkpoints.remove(firstCp));
+        String secondCp = checkpoints.iterator().next();
+
+        NodeState asyncNode = store.getRoot().getChildNode(
+                AsyncIndexUpdate.ASYNC);
+        assertEquals(firstCp, asyncNode.getString("async-slow"));
+        assertEquals(secondCp, asyncNode.getString("async"));
+        assertFalse(newHashSet(asyncNode.getStrings("async-temp")).contains(
+                firstCp));
+
+        NodeState indexNode = store.getRoot().getChildNode(
+                INDEX_DEFINITIONS_NAME);
+        assertEquals("async",
+                indexNode.getChildNode("rootIndex").getString("async"));
+        assertEquals("async-ignored", indexNode.getChildNode("ignored1")
+                .getString("async"));
+        assertNull(indexNode.getChildNode("ignored2").getString("async"));
+
+        assertEquals("async-slow", indexNode.getChildNode("changedIndex")
+                .getString("async"));
+        assertEquals(false,
+                indexNode.getChildNode("changedIndex").getBoolean("reindex"));
+
+        // new index task is on previous checkpoint
+        PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
+        assertEquals(ImmutableSet.of("testRoot"), find(lookup, "bar", "abc"));
+        assertEquals(ImmutableSet.of(), find(lookup, "bar", "def"));
+    }
+
+    @Test
+    public void taskSplitNoMatch() throws Exception {
+        MemoryNodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+        NodeBuilder builder = store.getRoot().builder();
+
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "ignored", true, false, ImmutableSet.of("baz"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async-ignored");
+
+        builder.child("testRoot").setProperty("foo", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        assertTrue("Expecting no checkpoints",
+                store.listCheckpoints().size() == 0);
+
+        AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+        async.run();
+        assertTrue("Expecting one checkpoint",
+                store.listCheckpoints().size() == 1);
+        String firstCp = store.listCheckpoints().iterator().next();
+
+        builder = store.getRoot().builder();
+        builder.child("testRoot").setProperty("foo", "def");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        IndexTaskSpliter splitter = async.getTaskSplitter();
+        // no match on the provided path
+        splitter.registerSplit(newHashSet("/oak:index/ignored"), "async-slow");
+        async.run();
+
+        Set<String> checkpoints = newHashSet(store.listCheckpoints());
+
+        assertTrue("Expecting a single checkpoint",
+                checkpoints.size() == 1);
+        String secondCp = checkpoints.iterator().next();
+
+        NodeState asyncNode = store.getRoot().getChildNode(
+                AsyncIndexUpdate.ASYNC);
+        assertEquals(secondCp, asyncNode.getString("async"));
+        assertNull(firstCp, asyncNode.getString("async-slow"));
+
+    }
 }