You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2013/04/26 14:45:04 UTC
svn commit: r1476173 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/index/
test/java/org/apache/jackrabbit/oak/plugins/index/
Author: alexparvulescu
Date: Fri Apr 26 12:45:03 2013
New Revision: 1476173
URL: http://svn.apache.org/r1476173
Log:
OAK-763 Asynchronous indexing
- first draft in
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1476173&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Fri Apr 26 12:45:03 2013
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.getBoolean;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.getString;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.isIndexNodeType;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.commit.VisibleEditor;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncIndexUpdate implements Runnable {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(AsyncIndexUpdate.class);
+
+ private final static int CONFIG_WATCH_DELAY_MS = 30000;
+
+ // TODO index impl run frequency could be picked up from the index config
+ // directly
+ private final static int INDEX_TASK_DELAY_MS = 5000;
+
+ private final NodeStore store;
+
+ private final ScheduledExecutorService executor;
+
+ private final IndexEditorProvider provider;
+
+ private NodeState current = EmptyNodeState.EMPTY_NODE;
+
+ final Map<String, IndexTask> active = new ConcurrentHashMap<String, IndexTask>();
+
+ private boolean started;
+
+ public AsyncIndexUpdate(@Nonnull NodeStore store,
+ @Nonnull ScheduledExecutorService executor,
+ @Nonnull IndexEditorProvider provider) {
+ this.store = checkNotNull(store);
+ this.executor = checkNotNull(executor);
+ this.provider = checkNotNull(provider);
+ }
+
+ public synchronized void start() {
+ if (started) {
+ log.error("Background index config watcher task already started");
+ return;
+ }
+ started = true;
+ executor.scheduleWithFixedDelay(this, 100, CONFIG_WATCH_DELAY_MS,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void run() {
+ log.debug("Running background index config watcher task");
+ NodeState after = store.getRoot();
+ try {
+ EditorHook hook = new EditorHook(new EditorProvider() {
+ @Override
+ public Editor getRootEditor(NodeState before, NodeState after,
+ NodeBuilder builder) {
+ return VisibleEditor.wrap(new IndexConfigWatcher());
+ }
+ });
+ hook.processCommit(current, after);
+ current = after;
+ } catch (CommitFailedException e) {
+ log.warn("IndexTask update failed", e);
+ }
+ }
+
+ public synchronized void replace(Map<String, Set<String>> async) {
+ Set<String> in = new HashSet<String>(async.keySet());
+ Set<String> existing = active.keySet();
+ for (String type : existing) {
+ if (in.remove(type)) {
+ Set<String> defs = async.get(type);
+ if (defs.isEmpty()) {
+ remove(type);
+ } else {
+ addOrUpdate(type, defs);
+ }
+ } else {
+ remove(type);
+ }
+ }
+ for (String type : in) {
+ addOrUpdate(type, async.get(type));
+ }
+ }
+
+ void addOrUpdate(String type, Set<String> defs) {
+ IndexTask task = active.get(type);
+ if (task == null) {
+ task = new IndexTask(store, provider, type, defs);
+ active.put(type, task);
+ task.start(executor);
+ } else {
+ task.update(defs);
+ }
+ }
+
+ void remove(String type) {
+ IndexTask task = active.remove(type);
+ if (task != null) {
+ task.stop();
+ }
+ }
+
+ /**
+ * This Editor is responsible for watching over changes on async index defs:
+ * added index defs and deleted index defs are pushed forward to the
+ * #replace method
+ *
+ */
+ class IndexConfigWatcher extends DefaultEditor {
+
+ private final Map<String, Set<String>> async = new HashMap<String, Set<String>>();
+
+ @Override
+ public void enter(NodeState before, NodeState after)
+ throws CommitFailedException {
+ if (!after.hasChildNode(INDEX_DEFINITIONS_NAME)) {
+ return;
+ }
+ NodeState index = after.getChildNode(INDEX_DEFINITIONS_NAME);
+ for (String indexName : index.getChildNodeNames()) {
+ NodeState indexChild = index.getChildNode(indexName);
+ if (isIndexNodeType(indexChild)) {
+ boolean isasync = getBoolean(indexChild,
+ ASYNC_PROPERTY_NAME);
+ String type = getString(indexChild, TYPE_PROPERTY_NAME);
+ if (type == null || !isasync) {
+ // skip null and non-async types
+ continue;
+ }
+ Set<String> defs = async.get(type);
+ if (defs == null) {
+ defs = new HashSet<String>();
+ async.put(type, defs);
+ }
+ defs.add(type);
+ }
+ }
+ }
+
+ @Override
+ public void leave(NodeState before, NodeState after)
+ throws CommitFailedException {
+ replace(async);
+ async.clear();
+ }
+
+ }
+
+ static class IndexTask implements Runnable {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(IndexTask.class);
+
+ private final NodeStore store;
+
+ private final String type;
+ private Set<String> defs;
+
+ private final IndexEditorProvider provider;
+
+ private ScheduledFuture<?> future;
+
+ private NodeState before;
+
+ public IndexTask(NodeStore store, IndexEditorProvider provider,
+ String type, Set<String> defs) {
+ this.store = store;
+ this.provider = provider;
+ this.type = type;
+ this.defs = defs;
+ this.before = EmptyNodeState.EMPTY_NODE;
+ }
+
+ public void update(Set<String> defs) {
+ // check of there are any changes
+ // TODO what happens when I move a def? (rm + add appears as a no-op
+ // in the set)
+ if (this.defs.equals(defs)) {
+ // no-op
+ return;
+ }
+
+ log.debug("Updated index def for type {}, reindexing", type);
+ this.defs = defs;
+ this.before = EmptyNodeState.EMPTY_NODE;
+ }
+
+ public synchronized void start(ScheduledExecutorService executor) {
+ if (future != null) {
+ throw new IllegalStateException("IndexTask has already started");
+ }
+ future = executor.scheduleWithFixedDelay(this, 100,
+ INDEX_TASK_DELAY_MS, TimeUnit.MILLISECONDS);
+ }
+
+ public synchronized void stop() {
+ if (future == null) {
+ log.warn("IndexTask has already stopped.");
+ return;
+ }
+ future.cancel(true);
+ }
+
+ @Override
+ public void run() {
+ log.debug("Running background index task for type {}.", type);
+ NodeStoreBranch branch = store.branch();
+ NodeState after = branch.getHead();
+ try {
+ EditorHook hook = new EditorHook(new TypedEditorProvider(
+ provider, type));
+ NodeState processed = hook.processCommit(before, after);
+ branch.setRoot(processed);
+ branch.merge(EmptyHook.INSTANCE);
+ before = after;
+ } catch (CommitFailedException e) {
+ log.warn("IndexTask update failed", e);
+ }
+ }
+ }
+
+ /**
+ * This creates a composite editor from a type-filtered index provider.
+ *
+ */
+ private static class TypedEditorProvider implements EditorProvider {
+
+ private final IndexEditorProvider provider;
+
+ private final String type;
+
+ public TypedEditorProvider(IndexEditorProvider provider, String type) {
+ this.type = type;
+ this.provider = provider;
+ }
+
+ /**
+ * This does not make any effort to filter async definitions. The
+ * assumption is that given an index type, all of the returned index
+ * hooks inherit the same async assumption.
+ *
+ */
+ @Override
+ public Editor getRootEditor(NodeState before, NodeState after,
+ NodeBuilder builder) {
+ return VisibleEditor.wrap(provider.getIndexEditor(type, builder));
+ }
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java?rev=1476173&r1=1476172&r2=1476173&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java Fri Apr 26 12:45:03 2013
@@ -190,10 +190,24 @@ public class IndexUtils {
}
}
+ public static String getString(NodeState state, String name) {
+ PropertyState property = state.getProperty(name);
+ if (property != null && property.getType() == STRING) {
+ return property.getValue(STRING);
+ } else {
+ return null;
+ }
+ }
+
public static boolean getBoolean(NodeBuilder builder, String name) {
PropertyState property = builder.getProperty(name);
- return property != null
- && property.getType() == BOOLEAN
+ return property != null && property.getType() == BOOLEAN
+ && property.getValue(BOOLEAN);
+ }
+
+ public static boolean getBoolean(NodeState state, String name) {
+ PropertyState property = state.getProperty(name);
+ return property != null && property.getType() == BOOLEAN
&& property.getValue(BOOLEAN);
}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1476173&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Fri Apr 26 12:45:03 2013
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_CONTENT_NODE_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTask;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.query.PropertyValues;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class AsyncIndexUpdateTest {
+
+ // TODO test index config deletes
+
+ private static Set<String> find(PropertyIndexLookup lookup, String name,
+ String value) {
+ return Sets.newHashSet(lookup.query(null, name,
+ PropertyValues.newString(value)));
+ }
+
+ private static NodeState checkPathExists(NodeState state, String... verify) {
+ NodeState c = state;
+ for (String p : verify) {
+ c = c.getChildNode(p);
+ assertTrue(c.exists());
+ }
+ return c;
+ }
+
+ /**
+ * Async Index Test
+ * <ul>
+ * <li>Add an index definition</li>
+ * <li>Add some content</li>
+ * <li>Search & verify</li>
+ * </ul>
+ *
+ */
+ @Test
+ public void testAsync() throws Exception {
+ NodeStore store = new MemoryNodeStore();
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+ NodeStoreBranch branch = store.branch();
+ NodeState root = branch.getHead();
+ NodeBuilder builder = root.builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, true);
+ builder.child("testRoot").setProperty("foo", "abc");
+
+ // merge it back in
+ branch.setRoot(builder.getNodeState());
+ branch.merge(EmptyHook.INSTANCE);
+
+ AsyncIndexUpdate async = new AsyncIndexUpdate(store, executor, provider);
+ runIndexing(async, 1);
+ root = store.getRoot();
+
+ // first check that the index content nodes exist
+ checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndex",
+ INDEX_CONTENT_NODE_NAME);
+
+ PropertyIndexLookup lookup = new PropertyIndexLookup(root);
+ assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
+ }
+
+ private static void runIndexing(AsyncIndexUpdate async, int expectedActive) {
+ async.run();
+ Map<String, IndexTask> active = async.active;
+ assertEquals(expectedActive, active.size());
+ for (IndexTask task : active.values()) {
+ task.run();
+ }
+ }
+
+ /**
+ * Async Index Test with 2 index defs at the same location
+ * <ul>
+ * <li>Add an index definition</li>
+ * <li>Add some content</li>
+ * <li>Search & verify</li>
+ * </ul>
+ *
+ */
+ @Test
+ public void testAsyncDouble() throws Exception {
+ NodeStore store = new MemoryNodeStore();
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+ NodeStoreBranch branch = store.branch();
+ NodeState root = branch.getHead();
+ NodeBuilder builder = root.builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, true);
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndexSecond", true, false, ImmutableSet.of("bar"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, true);
+
+ builder.child("testRoot").setProperty("foo", "abc")
+ .setProperty("bar", "def");
+ builder.child("testSecond").setProperty("bar", "ghi");
+
+ // merge it back in
+ branch.setRoot(builder.getNodeState());
+ branch.merge(EmptyHook.INSTANCE);
+
+ AsyncIndexUpdate async = new AsyncIndexUpdate(store, executor, provider);
+ runIndexing(async, 1);
+ root = store.getRoot();
+
+ // first check that the index content nodes exist
+ checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndex",
+ INDEX_CONTENT_NODE_NAME);
+ checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndexSecond",
+ INDEX_CONTENT_NODE_NAME);
+
+ PropertyIndexLookup lookup = new PropertyIndexLookup(root);
+ assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
+ assertEquals(ImmutableSet.of(), find(lookup, "foo", "def"));
+ assertEquals(ImmutableSet.of(), find(lookup, "foo", "ghi"));
+
+ assertEquals(ImmutableSet.of(), find(lookup, "bar", "abc"));
+ assertEquals(ImmutableSet.of("testRoot"), find(lookup, "bar", "def"));
+ assertEquals(ImmutableSet.of("testSecond"), find(lookup, "bar", "ghi"));
+
+ }
+
+ /**
+ * Async Index Test with 2 index defs at different tree locations
+ * <ul>
+ * <li>Add an index definition</li>
+ * <li>Add some content</li>
+ * <li>Search & verify</li>
+ * </ul>
+ *
+ */
+ @Test
+ public void testAsyncDoubleSubtree() throws Exception {
+ NodeStore store = new MemoryNodeStore();
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+ NodeStoreBranch branch = store.branch();
+ NodeState root = branch.getHead();
+ NodeBuilder builder = root.builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, true);
+ createIndexDefinition(
+ builder.child("newchild").child("other")
+ .child(INDEX_DEFINITIONS_NAME), "subIndex", true,
+ false, ImmutableSet.of("foo"), null).setProperty(
+ ASYNC_PROPERTY_NAME, true);
+
+ builder.child("testRoot").setProperty("foo", "abc");
+ builder.child("newchild").child("other").child("testChild")
+ .setProperty("foo", "xyz");
+
+ // merge it back in
+ branch.setRoot(builder.getNodeState());
+ branch.merge(EmptyHook.INSTANCE);
+
+ AsyncIndexUpdate async = new AsyncIndexUpdate(store, executor, provider);
+ runIndexing(async, 1);
+ root = store.getRoot();
+
+ // first check that the index content nodes exist
+ checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndex",
+ INDEX_CONTENT_NODE_NAME);
+ checkPathExists(root, "newchild", "other", INDEX_DEFINITIONS_NAME,
+ "subIndex", INDEX_CONTENT_NODE_NAME);
+
+ PropertyIndexLookup lookup = new PropertyIndexLookup(root);
+ assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
+
+ PropertyIndexLookup lookupChild = new PropertyIndexLookup(root
+ .getChildNode("newchild").getChildNode("other"));
+ assertEquals(ImmutableSet.of("testChild"),
+ find(lookupChild, "foo", "xyz"));
+ assertEquals(ImmutableSet.of(), find(lookupChild, "foo", "abc"));
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain