You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2015/04/27 12:35:50 UTC
svn commit: r1676238 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/
main/java/org/apache/jackrabbit/oak/api/jmx/
main/java/org/apache/jackrabbit/oak/plugins/index/
test/java/org/apache/jackrabbit/oak/plugins/index/
Author: alexparvulescu
Date: Mon Apr 27 10:35:49 2015
New Revision: 1676238
URL: http://svn.apache.org/r1676238
Log:
OAK-2749 Provide a "different lane" for slow indexers in async indexing
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1676238&r1=1676237&r2=1676238&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Mon Apr 27 10:35:49 2015
@@ -21,14 +21,15 @@ import static com.google.common.base.Pre
import static com.google.common.collect.Lists.newArrayList;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerObserver;
-import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -48,7 +49,6 @@ import javax.management.ObjectName;
import javax.security.auth.login.LoginException;
import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
@@ -56,7 +56,6 @@ import com.google.common.io.Closer;
import org.apache.jackrabbit.oak.api.ContentRepository;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.api.Root;
-import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
import org.apache.jackrabbit.oak.api.jmx.QueryEngineSettingsMBean;
import org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean;
import org.apache.jackrabbit.oak.core.ContentRepositoryImpl;
@@ -66,6 +65,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.CompositeIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.IndexMBeanRegistration;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
import org.apache.jackrabbit.oak.plugins.index.counter.jmx.NodeCounter;
import org.apache.jackrabbit.oak.plugins.index.counter.jmx.NodeCounterMBean;
@@ -308,11 +308,11 @@ public class Oak {
};
/**
- * Flag controlling the asynchronous indexing behavior. If false (default)
- * there will be no background indexing happening.
- *
+ * Map containing the (names -> delayInSecods) of the background indexing
+ * tasks that need to be started with this repository. A {@code null} value
+ * means no background tasks will run.
*/
- private boolean asyncIndexing = false;
+ private Map<String, Long> asyncTasks;
public Oak(NodeStore store) {
this.store = checkNotNull(store);
@@ -504,15 +504,38 @@ public class Oak {
}
/**
+ * <p>
* Enable the asynchronous (background) indexing behavior.
- *
- * Please not that when enabling the background indexer, you need to take
+ * </p>
+ * <p>
+ * Please note that when enabling the background indexer, you need to take
* care of calling
* <code>#shutdown<code> on the <code>executor<code> provided for this Oak instance.
- *
+ * </p>
+ * @deprecated Use {@link Oak#withAsyncIndexing(String, long)} instead
*/
+ @Deprecated
public Oak withAsyncIndexing() {
- this.asyncIndexing = true;
+ return withAsyncIndexing("async", 5);
+ }
+
+ /**
+ * <p>
+ * Enable the asynchronous (background) indexing behavior for the provided
+ * task name.
+ * </p>
+ * <p>
+ * Please note that when enabling the background indexer, you need to take
+ * care of calling
+ * <code>#shutdown<code> on the <code>executor<code> provided for this Oak instance.
+ * </p>
+ */
+ public Oak withAsyncIndexing(@Nonnull String name, long delayInSeconds) {
+ if (this.asyncTasks == null) {
+ asyncTasks = new HashMap<String, Long>();
+ }
+ checkState(delayInSeconds > 0, "delayInSeconds value must be > 0");
+ asyncTasks.put(checkNotNull(name), delayInSeconds);
return this;
}
@@ -537,25 +560,25 @@ public class Oak {
initHooks.add(new EditorHook(CompositeEditorProvider
.compose(editorProviders)));
- if (asyncIndexing) {
- String name = "async";
- AsyncIndexUpdate task = new AsyncIndexUpdate(name, store,
- indexEditors);
- regs.add(scheduleWithFixedDelay(whiteboard, task, 5, true));
- regs.add(registerMBean(whiteboard, IndexStatsMBean.class,
- task.getIndexStats(), IndexStatsMBean.TYPE, name));
- // Register AsyncIndexStats for execution stats update
- regs.add(
- scheduleWithFixedDelay(whiteboard, task.getIndexStats(), 1, false));
+ if (asyncTasks != null) {
+ IndexMBeanRegistration indexRegistration = new IndexMBeanRegistration(
+ whiteboard);
+ regs.add(indexRegistration);
+ for (Entry<String, Long> t : asyncTasks.entrySet()) {
+ AsyncIndexUpdate task = new AsyncIndexUpdate(t.getKey(), store,
+ indexEditors);
+ indexRegistration.registerAsyncIndexer(task, t.getValue());
+ }
+ // TODO verify how this fits in with OAK-2749
PropertyIndexAsyncReindex asyncPI = new PropertyIndexAsyncReindex(
new AsyncIndexUpdate(IndexConstants.ASYNC_REINDEX_VALUE,
store, indexEditors, true), getExecutor());
regs.add(registerMBean(whiteboard,
PropertyIndexAsyncReindexMBean.class, asyncPI,
- PropertyIndexAsyncReindexMBean.TYPE, name));
+ PropertyIndexAsyncReindexMBean.TYPE, "async"));
}
-
+
regs.add(registerMBean(whiteboard, NodeCounterMBean.class,
new NodeCounter(store), NodeCounterMBean.TYPE, "nodeCounter"));
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1676238&r1=1676237&r2=1676238&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java Mon Apr 27 10:35:49 2015
@@ -19,6 +19,9 @@ package org.apache.jackrabbit.oak.api.jm
import javax.management.openmbean.CompositeData;
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+import org.apache.jackrabbit.oak.commons.jmx.Name;
+
public interface IndexStatsMBean {
String TYPE = "IndexStats";
@@ -140,4 +143,23 @@ public interface IndexStatsMBean {
* Resets the consolidated stats.
*/
void resetConsolidatedExecutionStats();
+
+ /**
+ * Splits the current indexing tasks into 2, indexes that are passed in as
+ * an input will have their 'async' property updated to
+ * {@code newIndexTaskName}.
+ *
+ * Note that this call will *not* bootstrap a new indexing task for the
+ * given name.
+ */
+ void splitIndexingTask(
+ @Name("paths") @Description("Comma separated list of paths of the index definitions") String paths,
+ @Name("newIndexTaskName") @Description("The indexing task name set on the async property") String newIndexTaskName);
+
+ /**
+ * Starts a new background indexing task and registers the JMX MBeans for it
+ *
+ */
+ void registerAsyncIndexer(@Name("name") String name,
+ @Name("delayInSeconds") long delayInSeconds);
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1676238&r1=1676237&r2=1676238&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Mon Apr 27 10:35:49 2015
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.plugins.index;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Sets.newHashSet;
import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE;
import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
@@ -32,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -41,13 +43,15 @@ import javax.management.openmbean.OpenTy
import javax.management.openmbean.SimpleType;
import com.google.common.base.Objects;
+import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
+
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
import org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler;
import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
@@ -135,10 +139,20 @@ public class AsyncIndexUpdate implements
private final MissingIndexProviderStrategy missingStrategy = new DefaultMissingIndexProviderStrategy();
+ /**
+ * Property name which stores the temporary checkpoint that need to be released on the next run
+ */
+ private final String tempCpName;
+
+ private final IndexTaskSpliter taskSplitter = new IndexTaskSpliter();
+
+ private IndexMBeanRegistration mbeanRegistration;
+
public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
@Nonnull IndexEditorProvider provider, boolean switchOnSync) {
this.name = checkNotNull(name);
this.lastIndexedTo = name + "-LastIndexedTo";
+ this.tempCpName = name + "-temp";
this.store = checkNotNull(store);
this.provider = checkNotNull(provider);
this.switchOnSync = switchOnSync;
@@ -166,7 +180,6 @@ public class AsyncIndexUpdate implements
private long updates = 0;
private final String leaseName;
- private final String tempCpName;
public AsyncUpdateCallback(String checkpoint, String afterCheckpoint)
throws CommitFailedException {
@@ -174,7 +187,6 @@ public class AsyncIndexUpdate implements
this.checkpoint = checkpoint;
this.lease = now + 2 * ASYNC_TIMEOUT;
this.leaseName = name + "-lease";
- this.tempCpName = name + "-temp";
NodeState root = store.getRoot();
long beforeLease = root.getChildNode(ASYNC).getLong(leaseName);
@@ -199,14 +211,14 @@ public class AsyncIndexUpdate implements
indexStats.setProcessedCheckpoint(afterCheckpoint);
// try to drop temp cps, add 'currentCp' to the temp cps list
- Set<String> temps = Sets.newHashSet();
+ Set<String> temps = newHashSet();
for (String cp : getStrings(async, tempCpName)) {
if (cp.equals(checkpoint)) {
temps.add(cp);
continue;
}
boolean released = store.release(cp);
- log.debug("Releasing temporary checkpoint {}: {}", cp, released);
+ log.debug("[{}] Releasing temporary checkpoint {}: {}", name, cp, released);
if (!released) {
temps.add(cp);
}
@@ -216,14 +228,6 @@ public class AsyncIndexUpdate implements
indexStats.setTempCheckpoints(temps);
}
- private Iterable<String> getStrings(NodeBuilder b, String p) {
- PropertyState ps = b.getProperty(p);
- if (ps != null) {
- return ps.getValue(Type.STRINGS);
- }
- return Sets.newHashSet();
- }
-
boolean isDirty() {
return updates > 0;
}
@@ -258,7 +262,7 @@ public class AsyncIndexUpdate implements
if (indexStats.isPaused()) {
return;
}
- log.debug("Running background index task {}", name);
+ log.debug("[{}] Running background index task", name);
NodeState root = store.getRoot();
@@ -267,8 +271,9 @@ public class AsyncIndexUpdate implements
long leaseEndTime = async.getLong(name + "-lease");
long currentTime = System.currentTimeMillis();
if (leaseEndTime > currentTime) {
- log.debug("Another copy of the {} index update is already running;"
- + " skipping this update. Time left for lease to expire {}s", name, (leaseEndTime - currentTime)/1000);
+ log.debug(
+ "[{}] Another copy of the index update is already running; skipping this update. Time left for lease to expire {}s",
+ name, (leaseEndTime - currentTime) / 1000);
return;
}
@@ -278,20 +283,21 @@ public class AsyncIndexUpdate implements
if (beforeCheckpoint != null) {
NodeState state = store.retrieve(beforeCheckpoint);
if (state == null) {
- log.warn("Failed to retrieve previously indexed checkpoint {};"
- + " re-running the initial {} index update",
- beforeCheckpoint, name);
+ log.warn(
+ "[{}] Failed to retrieve previously indexed checkpoint {}; re-running the initial index update",
+ name, beforeCheckpoint);
beforeCheckpoint = null;
before = MISSING_NODE;
} else if (noVisibleChanges(state, root)) {
- log.debug("No changes since last checkpoint;"
- + " skipping the {} index update", name);
+ log.debug(
+ "[{}] No changes since last checkpoint; skipping the index update",
+ name);
return;
} else {
before = state;
}
} else {
- log.info("Initial {} index update", name);
+ log.info("[{}] Initial index update", name);
before = MISSING_NODE;
}
@@ -299,11 +305,13 @@ public class AsyncIndexUpdate implements
String afterTime = now();
String afterCheckpoint = store.checkpoint(lifetime, ImmutableMap.of(
"creator", AsyncIndexUpdate.class.getSimpleName(),
- "thread", Thread.currentThread().getName()));
+ "thread", Thread.currentThread().getName(),
+ "name", name));
NodeState after = store.retrieve(afterCheckpoint);
if (after == null) {
- log.debug("Unable to retrieve newly created checkpoint {},"
- + " skipping the {} index update", afterCheckpoint, name);
+ log.debug(
+ "[{}] Unable to retrieve newly created checkpoint {}, skipping the index update",
+ name, afterCheckpoint);
return;
}
@@ -313,7 +321,7 @@ public class AsyncIndexUpdate implements
// the update succeeded, i.e. it no longer fails
if (failing) {
- log.info("Index update {} no longer fails", name);
+ log.info("[{}] Index update no longer fails", name);
failing = false;
}
@@ -327,18 +335,22 @@ public class AsyncIndexUpdate implements
} catch (CommitFailedException e) {
if (e == CONCURRENT_UPDATE) {
- log.debug("Concurrent update detected in the {} index update", name);
+ log.debug("[{}] Concurrent update detected in the index update", name);
} else if (failing) {
- log.debug("The {} index update is still failing", name, e);
+ log.debug("[{}] The index update is still failing", name, e);
} else {
- log.warn("The {} index update failed", name, e);
+ log.warn("[{}] The index update failed", name, e);
failing = true;
}
} finally {
- if (checkpointToRelease != null) { // null during initial indexing
+ // null during initial indexing
+ // and skip release if this cp was used in a split operation
+ if (checkpointToRelease != null
+ && !checkpointToRelease.equals(taskSplitter
+ .getLastReferencedCp())) {
if (!store.release(checkpointToRelease)) {
- log.debug("Unable to release checkpoint {}",
+ log.debug("[{}] Unable to release checkpoint {}", name,
checkpointToRelease);
}
}
@@ -358,6 +370,11 @@ public class AsyncIndexUpdate implements
// and maintaining the update lease
AsyncUpdateCallback callback =
new AsyncUpdateCallback(beforeCheckpoint, afterCheckpoint);
+
+ // check for index tasks split requests, if a split happened, make
+ // sure to not delete the reference checkpoint, as the other index
+ // task will take care of it
+ taskSplitter.maybeSplit(beforeCheckpoint, callback.lease);
try {
NodeBuilder builder = store.getRoot().builder();
@@ -384,8 +401,8 @@ public class AsyncIndexUpdate implements
} else {
if (switchOnSync) {
log.debug(
- "No changes detected after diff; will try to switch to synchronous updates on {}",
- reindexedDefinitions);
+ "[{}] No changes detected after diff; will try to switch to synchronous updates on {}",
+ name, reindexedDefinitions);
// no changes after diff, switch to sync on the async defs
for (String path : reindexedDefinitions) {
@@ -406,7 +423,8 @@ public class AsyncIndexUpdate implements
postAsyncRunStatsStatus(indexStats);
}
if (indexUpdate.isReindexingPerformed()) {
- log.info("Reindexing ({}) completed for indexes: {} in {}", name, indexUpdate.getReindexStats(), watch);
+ log.info("[{}] Reindexing completed for indexes: {} in {}",
+ name, indexUpdate.getReindexStats(), watch);
progressLogged = true;
}
} finally {
@@ -414,7 +432,7 @@ public class AsyncIndexUpdate implements
}
if (!progressLogged) {
- String msg = "AsyncIndex ({}) update run completed in {}. Indexed {} nodes";
+ String msg = "[{}] AsyncIndex update run completed in {}. Indexed {} nodes";
//Log at info level if time taken is more than 5 min
if (watch.elapsed(TimeUnit.MINUTES) >= 5) {
log.info(msg, name, watch, callback.updates);
@@ -469,7 +487,12 @@ public class AsyncIndexUpdate implements
return indexStats.getStatus() == STATUS_DONE;
}
- final class AsyncIndexStats implements IndexStatsMBean, Runnable {
+ final class AsyncIndexStats extends AnnotatedStandardMBean implements
+ IndexStatsMBean, Runnable {
+
+ protected AsyncIndexStats() {
+ super(IndexStatsMBean.class);
+ }
private String start = "";
private String done = "";
@@ -528,13 +551,13 @@ public class AsyncIndexUpdate implements
@Override
public void pause() {
- log.debug("Pausing the async indexer");
+ log.debug("[{}] Pausing the async indexer", name);
this.isPaused = true;
}
@Override
public void resume() {
- log.debug("Resuming the async indexer");
+ log.debug("[{}] Resuming the async indexer", name);
this.isPaused = false;
}
@@ -640,7 +663,7 @@ public class AsyncIndexUpdate implements
names,
new OpenType[] {SimpleType.LONG, SimpleType.LONG, SimpleType.LONG});
} catch (OpenDataException e) {
- log.warn("Error in creating CompositeType for consolidated stats", e);
+ log.warn("[{}] Error in creating CompositeType for consolidated stats", name, e);
}
}
@@ -669,7 +692,7 @@ public class AsyncIndexUpdate implements
consolidatedExecTime.longValue(), consolidatedNodes.longValue()};
return new CompositeDataSupport(consolidatedType, names, values);
} catch (Exception e) {
- log.error("Error retrieving consolidated stats", e);
+ log.error("[{}] Error retrieving consolidated stats", name, e);
return null;
}
}
@@ -685,6 +708,22 @@ public class AsyncIndexUpdate implements
execTimer.recordOneSecond();
}
}
+
+ @Override
+ public void splitIndexingTask(String paths, String newIndexTaskName) {
+ splitIndexingTask(newHashSet(Splitter.on(",").trimResults()
+ .omitEmptyStrings().split(paths)), newIndexTaskName);
+ }
+
+ private void splitIndexingTask(Set<String> paths,
+ String newIndexTaskName) {
+ taskSplitter.registerSplit(paths, newIndexTaskName);
+ }
+
+ @Override
+ public void registerAsyncIndexer(String name, long delayInSeconds) {
+ taskSplitter.registerAsyncIndexer(name, delayInSeconds);
+ }
}
/**
@@ -729,7 +768,7 @@ public class AsyncIndexUpdate implements
static class DefaultMissingIndexProviderStrategy extends
MissingIndexProviderStrategy {
- private final Set<String> ignore = Sets.newHashSet("disabled");
+ private final Set<String> ignore = newHashSet("disabled");
@Override
public void onMissingIndex(String type, NodeBuilder definition)
@@ -746,4 +785,112 @@ public class AsyncIndexUpdate implements
return failing;
}
+ class IndexTaskSpliter {
+
+ private Set<String> paths = null;
+ private String newIndexTaskName = null;
+ private String lastReferencedCp;
+
+ private Set<String> registeredTasks = newHashSet();
+
+ void registerSplit(Set<String> paths, String newIndexTaskName) {
+ log.info(
+ "[{}] Registered split of following index definitions {} to new async task {}.",
+ name, paths, newIndexTaskName);
+ this.paths = newHashSet(paths);
+ this.newIndexTaskName = newIndexTaskName;
+ }
+
+ void maybeSplit(@CheckForNull String refCheckpoint, long lease)
+ throws CommitFailedException {
+ if (paths == null) {
+ return;
+ }
+ split(refCheckpoint, lease);
+ }
+
+ private void split(@CheckForNull String refCheckpoint, long lease) throws CommitFailedException {
+ NodeBuilder builder = store.getRoot().builder();
+ if (refCheckpoint != null) {
+ NodeBuilder async = builder.child(ASYNC);
+ // add new reference
+ async.setProperty(newIndexTaskName, refCheckpoint);
+ // update old 'temp' list: remove refcp so it doesn't get released on next run
+ Set<String> temps = newHashSet();
+ for (String cp : getStrings(async, tempCpName)) {
+ if (cp.equals(refCheckpoint)) {
+ continue;
+ }
+ temps.add(cp);
+ }
+ async.setProperty(tempCpName, temps, Type.STRINGS);
+ indexStats.setTempCheckpoints(temps);
+ }
+
+ // update index defs name => newIndexTaskName
+ Set<String> updated = newHashSet();
+ for (String path : paths) {
+ NodeBuilder c = builder;
+ for (String p : elements(path)) {
+ c = c.getChildNode(p);
+ }
+ if (c.exists() && name.equals(c.getString("async"))) {
+ c.setProperty("async", newIndexTaskName);
+ updated.add(path);
+ }
+ }
+
+ if (!updated.isEmpty()) {
+ mergeWithConcurrencyCheck(builder, refCheckpoint, lease);
+ log.info(
+ "[{}] Successfully split index definitions {} to async task named {} with referenced checkpoint {}.",
+ name, updated, newIndexTaskName, refCheckpoint);
+ lastReferencedCp = refCheckpoint;
+ }
+ paths = null;
+ newIndexTaskName = null;
+ }
+
+ public String getLastReferencedCp() {
+ return lastReferencedCp;
+ }
+
+ void registerAsyncIndexer(String newTask, long delayInSeconds) {
+ if (registeredTasks.contains(newTask)) {
+ // prevent accidental double call
+ log.warn("[{}] Task {} is already registered.", name, newTask);
+ return;
+ }
+ if (mbeanRegistration != null) {
+ log.info(
+ "[{}] Registering a new indexing task {} running each {} seconds.",
+ name, newTask, delayInSeconds);
+ AsyncIndexUpdate task = new AsyncIndexUpdate(newTask, store,
+ provider);
+ mbeanRegistration.registerAsyncIndexer(task, delayInSeconds);
+ registeredTasks.add(newTask);
+ }
+ }
+ }
+
+ private static Iterable<String> getStrings(NodeBuilder b, String p) {
+ PropertyState ps = b.getProperty(p);
+ if (ps != null) {
+ return ps.getValue(Type.STRINGS);
+ }
+ return newHashSet();
+ }
+
+ IndexTaskSpliter getTaskSplitter() {
+ return taskSplitter;
+ }
+
+ public void setIndexMBeanRegistration(IndexMBeanRegistration mbeanRegistration) {
+ this.mbeanRegistration = mbeanRegistration;
+ }
+
+ protected String getName() {
+ return name;
+ }
+
}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java?rev=1676238&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java Mon Apr 27 10:35:49 2015
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay;
+
+import java.util.List;
+
+import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+
+import com.google.common.collect.Lists;
+
+public class IndexMBeanRegistration implements Registration {
+
+ private final Whiteboard whiteboard;
+ private final List<Registration> regs = Lists.newArrayList();
+
+ public IndexMBeanRegistration(Whiteboard whiteboard) {
+ this.whiteboard = whiteboard;
+ }
+
+ public void registerAsyncIndexer(AsyncIndexUpdate task, long delayInSeconds) {
+ task.setIndexMBeanRegistration(this);
+ regs.add(scheduleWithFixedDelay(whiteboard, task, delayInSeconds, true));
+ regs.add(registerMBean(whiteboard, IndexStatsMBean.class,
+ task.getIndexStats(), IndexStatsMBean.TYPE, task.getName()));
+ // Register AsyncIndexStats for execution stats update
+ regs.add(scheduleWithFixedDelay(whiteboard, task.getIndexStats(), 1,
+ false));
+ }
+
+ @Override
+ public void unregister() {
+ new CompositeRegistration(regs).unregister();
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1676238&r1=1676237&r2=1676238&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Mon Apr 27 10:35:49 2015
@@ -25,6 +25,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider.TYPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -41,6 +42,7 @@ import javax.annotation.Nullable;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTaskSpliter;
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
@@ -680,4 +682,126 @@ public class AsyncIndexUpdateTest {
}
+ @Test
+ public void taskSplit() throws Exception {
+ MemoryNodeStore store = new MemoryNodeStore();
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+ NodeBuilder builder = store.getRoot().builder();
+
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "changedIndex", true, false, ImmutableSet.of("bar"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "ignored1", true, false, ImmutableSet.of("baz"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async-ignored");
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "ignored2", true, false, ImmutableSet.of("etc"), null);
+
+ builder.child("testRoot").setProperty("foo", "abc");
+ builder.child("testRoot").setProperty("bar", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ assertTrue("Expecting no checkpoints",
+ store.listCheckpoints().size() == 0);
+
+ AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+ async.run();
+ assertTrue("Expecting one checkpoint",
+ store.listCheckpoints().size() == 1);
+ String firstCp = store.listCheckpoints().iterator().next();
+
+ builder = store.getRoot().builder();
+ builder.child("testRoot").setProperty("foo", "def");
+ builder.child("testRoot").setProperty("bar", "def");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ IndexTaskSpliter splitter = async.getTaskSplitter();
+ splitter.registerSplit(newHashSet("/oak:index/changedIndex"), "async-slow");
+
+ async.run();
+
+ Set<String> checkpoints = newHashSet(store.listCheckpoints());
+
+ assertTrue("Expecting two checkpoints",
+ checkpoints.size() == 2);
+ assertTrue(checkpoints.remove(firstCp));
+ String secondCp = checkpoints.iterator().next();
+
+ NodeState asyncNode = store.getRoot().getChildNode(
+ AsyncIndexUpdate.ASYNC);
+ assertEquals(firstCp, asyncNode.getString("async-slow"));
+ assertEquals(secondCp, asyncNode.getString("async"));
+ assertFalse(newHashSet(asyncNode.getStrings("async-temp")).contains(
+ firstCp));
+
+ NodeState indexNode = store.getRoot().getChildNode(
+ INDEX_DEFINITIONS_NAME);
+ assertEquals("async",
+ indexNode.getChildNode("rootIndex").getString("async"));
+ assertEquals("async-ignored", indexNode.getChildNode("ignored1")
+ .getString("async"));
+ assertNull(indexNode.getChildNode("ignored2").getString("async"));
+
+ assertEquals("async-slow", indexNode.getChildNode("changedIndex")
+ .getString("async"));
+ assertEquals(false,
+ indexNode.getChildNode("changedIndex").getBoolean("reindex"));
+
+ // new index task is on previous checkpoint
+ PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
+ assertEquals(ImmutableSet.of("testRoot"), find(lookup, "bar", "abc"));
+ assertEquals(ImmutableSet.of(), find(lookup, "bar", "def"));
+ }
+
+ @Test
+ public void taskSplitNoMatch() throws Exception {
+ MemoryNodeStore store = new MemoryNodeStore();
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+ NodeBuilder builder = store.getRoot().builder();
+
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "ignored", true, false, ImmutableSet.of("baz"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async-ignored");
+
+ builder.child("testRoot").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ assertTrue("Expecting no checkpoints",
+ store.listCheckpoints().size() == 0);
+
+ AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+ async.run();
+ assertTrue("Expecting one checkpoint",
+ store.listCheckpoints().size() == 1);
+ String firstCp = store.listCheckpoints().iterator().next();
+
+ builder = store.getRoot().builder();
+ builder.child("testRoot").setProperty("foo", "def");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ IndexTaskSpliter splitter = async.getTaskSplitter();
+ // no match on the provided path
+ splitter.registerSplit(newHashSet("/oak:index/ignored"), "async-slow");
+ async.run();
+
+ Set<String> checkpoints = newHashSet(store.listCheckpoints());
+
+ assertTrue("Expecting a single checkpoint",
+ checkpoints.size() == 1);
+ String secondCp = checkpoints.iterator().next();
+
+ NodeState asyncNode = store.getRoot().getChildNode(
+ AsyncIndexUpdate.ASYNC);
+ assertEquals(secondCp, asyncNode.getString("async"));
+ assertNull(firstCp, asyncNode.getString("async-slow"));
+
+ }
}