You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2013/04/26 14:45:04 UTC

svn commit: r1476173 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/index/ test/java/org/apache/jackrabbit/oak/plugins/index/

Author: alexparvulescu
Date: Fri Apr 26 12:45:03 2013
New Revision: 1476173

URL: http://svn.apache.org/r1476173
Log:
OAK-763 Asynchronous indexing
 - first draft in

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1476173&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Fri Apr 26 12:45:03 2013
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.getBoolean;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.getString;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.isIndexNodeType;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.commit.VisibleEditor;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncIndexUpdate implements Runnable {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(AsyncIndexUpdate.class);
+
+    private final static int CONFIG_WATCH_DELAY_MS = 30000;
+
+    // TODO index impl run frequency could be picked up from the index config
+    // directly
+    private final static int INDEX_TASK_DELAY_MS = 5000;
+
+    private final NodeStore store;
+
+    private final ScheduledExecutorService executor;
+
+    private final IndexEditorProvider provider;
+
+    private NodeState current = EmptyNodeState.EMPTY_NODE;
+
+    final Map<String, IndexTask> active = new ConcurrentHashMap<String, IndexTask>();
+
+    private boolean started;
+
+    public AsyncIndexUpdate(@Nonnull NodeStore store,
+            @Nonnull ScheduledExecutorService executor,
+            @Nonnull IndexEditorProvider provider) {
+        this.store = checkNotNull(store);
+        this.executor = checkNotNull(executor);
+        this.provider = checkNotNull(provider);
+    }
+
+    public synchronized void start() {
+        if (started) {
+            log.error("Background index config watcher task already started");
+            return;
+        }
+        started = true;
+        executor.scheduleWithFixedDelay(this, 100, CONFIG_WATCH_DELAY_MS,
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void run() {
+        log.debug("Running background index config watcher task");
+        NodeState after = store.getRoot();
+        try {
+            EditorHook hook = new EditorHook(new EditorProvider() {
+                @Override
+                public Editor getRootEditor(NodeState before, NodeState after,
+                        NodeBuilder builder) {
+                    return VisibleEditor.wrap(new IndexConfigWatcher());
+                }
+            });
+            hook.processCommit(current, after);
+            current = after;
+        } catch (CommitFailedException e) {
+            log.warn("IndexTask update failed", e);
+        }
+    }
+
+    public synchronized void replace(Map<String, Set<String>> async) {
+        Set<String> in = new HashSet<String>(async.keySet());
+        Set<String> existing = active.keySet();
+        for (String type : existing) {
+            if (in.remove(type)) {
+                Set<String> defs = async.get(type);
+                if (defs.isEmpty()) {
+                    remove(type);
+                } else {
+                    addOrUpdate(type, defs);
+                }
+            } else {
+                remove(type);
+            }
+        }
+        for (String type : in) {
+            addOrUpdate(type, async.get(type));
+        }
+    }
+
+    void addOrUpdate(String type, Set<String> defs) {
+        IndexTask task = active.get(type);
+        if (task == null) {
+            task = new IndexTask(store, provider, type, defs);
+            active.put(type, task);
+            task.start(executor);
+        } else {
+            task.update(defs);
+        }
+    }
+
+    void remove(String type) {
+        IndexTask task = active.remove(type);
+        if (task != null) {
+            task.stop();
+        }
+    }
+
+    /**
+     * This Editor is responsible for watching over changes on async index defs:
+     * added index defs and deleted index defs are pushed forward to the
+     * #replace method
+     * 
+     */
+    class IndexConfigWatcher extends DefaultEditor {
+
+        private final Map<String, Set<String>> async = new HashMap<String, Set<String>>();
+
+        @Override
+        public void enter(NodeState before, NodeState after)
+                throws CommitFailedException {
+            if (!after.hasChildNode(INDEX_DEFINITIONS_NAME)) {
+                return;
+            }
+            NodeState index = after.getChildNode(INDEX_DEFINITIONS_NAME);
+            for (String indexName : index.getChildNodeNames()) {
+                NodeState indexChild = index.getChildNode(indexName);
+                if (isIndexNodeType(indexChild)) {
+                    boolean isasync = getBoolean(indexChild,
+                            ASYNC_PROPERTY_NAME);
+                    String type = getString(indexChild, TYPE_PROPERTY_NAME);
+                    if (type == null || !isasync) {
+                        // skip null and non-async types
+                        continue;
+                    }
+                    Set<String> defs = async.get(type);
+                    if (defs == null) {
+                        defs = new HashSet<String>();
+                        async.put(type, defs);
+                    }
+                    defs.add(type);
+                }
+            }
+        }
+
+        @Override
+        public void leave(NodeState before, NodeState after)
+                throws CommitFailedException {
+            replace(async);
+            async.clear();
+        }
+
+    }
+
+    static class IndexTask implements Runnable {
+
+        private static final Logger log = LoggerFactory
+                .getLogger(IndexTask.class);
+
+        private final NodeStore store;
+
+        private final String type;
+        private Set<String> defs;
+
+        private final IndexEditorProvider provider;
+
+        private ScheduledFuture<?> future;
+
+        private NodeState before;
+
+        public IndexTask(NodeStore store, IndexEditorProvider provider,
+                String type, Set<String> defs) {
+            this.store = store;
+            this.provider = provider;
+            this.type = type;
+            this.defs = defs;
+            this.before = EmptyNodeState.EMPTY_NODE;
+        }
+
+        public void update(Set<String> defs) {
+            // check of there are any changes
+            // TODO what happens when I move a def? (rm + add appears as a no-op
+            // in the set)
+            if (this.defs.equals(defs)) {
+                // no-op
+                return;
+            }
+
+            log.debug("Updated index def for type {}, reindexing", type);
+            this.defs = defs;
+            this.before = EmptyNodeState.EMPTY_NODE;
+        }
+
+        public synchronized void start(ScheduledExecutorService executor) {
+            if (future != null) {
+                throw new IllegalStateException("IndexTask has already started");
+            }
+            future = executor.scheduleWithFixedDelay(this, 100,
+                    INDEX_TASK_DELAY_MS, TimeUnit.MILLISECONDS);
+        }
+
+        public synchronized void stop() {
+            if (future == null) {
+                log.warn("IndexTask has already stopped.");
+                return;
+            }
+            future.cancel(true);
+        }
+
+        @Override
+        public void run() {
+            log.debug("Running background index task for type {}.", type);
+            NodeStoreBranch branch = store.branch();
+            NodeState after = branch.getHead();
+            try {
+                EditorHook hook = new EditorHook(new TypedEditorProvider(
+                        provider, type));
+                NodeState processed = hook.processCommit(before, after);
+                branch.setRoot(processed);
+                branch.merge(EmptyHook.INSTANCE);
+                before = after;
+            } catch (CommitFailedException e) {
+                log.warn("IndexTask update failed", e);
+            }
+        }
+    }
+
+    /**
+     * This creates a composite editor from a type-filtered index provider.
+     * 
+     */
+    private static class TypedEditorProvider implements EditorProvider {
+
+        private final IndexEditorProvider provider;
+
+        private final String type;
+
+        public TypedEditorProvider(IndexEditorProvider provider, String type) {
+            this.type = type;
+            this.provider = provider;
+        }
+
+        /**
+         * This does not make any effort to filter async definitions. The
+         * assumption is that given an index type, all of the returned index
+         * hooks inherit the same async assumption.
+         * 
+         */
+        @Override
+        public Editor getRootEditor(NodeState before, NodeState after,
+                NodeBuilder builder) {
+            return VisibleEditor.wrap(provider.getIndexEditor(type, builder));
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java?rev=1476173&r1=1476172&r2=1476173&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java Fri Apr 26 12:45:03 2013
@@ -190,10 +190,24 @@ public class IndexUtils {
         }
     }
 
+    public static String getString(NodeState state, String name) {
+        PropertyState property = state.getProperty(name);
+        if (property != null && property.getType() == STRING) {
+            return property.getValue(STRING);
+        } else {
+            return null;
+        }
+    }
+
     public static boolean getBoolean(NodeBuilder builder, String name) {
         PropertyState property = builder.getProperty(name);
-        return property != null
-                && property.getType() == BOOLEAN
+        return property != null && property.getType() == BOOLEAN
+                && property.getValue(BOOLEAN);
+    }
+
+    public static boolean getBoolean(NodeState state, String name) {
+        PropertyState property = state.getProperty(name);
+        return property != null && property.getType() == BOOLEAN
                 && property.getValue(BOOLEAN);
     }
 

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1476173&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Fri Apr 26 12:45:03 2013
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_CONTENT_NODE_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTask;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.query.PropertyValues;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class AsyncIndexUpdateTest {
+
+    // TODO test index config deletes
+
+    private static Set<String> find(PropertyIndexLookup lookup, String name,
+            String value) {
+        return Sets.newHashSet(lookup.query(null, name,
+                PropertyValues.newString(value)));
+    }
+
+    private static NodeState checkPathExists(NodeState state, String... verify) {
+        NodeState c = state;
+        for (String p : verify) {
+            c = c.getChildNode(p);
+            assertTrue(c.exists());
+        }
+        return c;
+    }
+
+    /**
+     * Async Index Test
+     * <ul>
+     * <li>Add an index definition</li>
+     * <li>Add some content</li>
+     * <li>Search & verify</li>
+     * </ul>
+     * 
+     */
+    @Test
+    public void testAsync() throws Exception {
+        NodeStore store = new MemoryNodeStore();
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+        NodeStoreBranch branch = store.branch();
+        NodeState root = branch.getHead();
+        NodeBuilder builder = root.builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, true);
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        // merge it back in
+        branch.setRoot(builder.getNodeState());
+        branch.merge(EmptyHook.INSTANCE);
+
+        AsyncIndexUpdate async = new AsyncIndexUpdate(store, executor, provider);
+        runIndexing(async, 1);
+        root = store.getRoot();
+
+        // first check that the index content nodes exist
+        checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndex",
+                INDEX_CONTENT_NODE_NAME);
+
+        PropertyIndexLookup lookup = new PropertyIndexLookup(root);
+        assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
+    }
+
+    private static void runIndexing(AsyncIndexUpdate async, int expectedActive) {
+        async.run();
+        Map<String, IndexTask> active = async.active;
+        assertEquals(expectedActive, active.size());
+        for (IndexTask task : active.values()) {
+            task.run();
+        }
+    }
+
+    /**
+     * Async Index Test with 2 index defs at the same location
+     * <ul>
+     * <li>Add an index definition</li>
+     * <li>Add some content</li>
+     * <li>Search & verify</li>
+     * </ul>
+     * 
+     */
+    @Test
+    public void testAsyncDouble() throws Exception {
+        NodeStore store = new MemoryNodeStore();
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+        NodeStoreBranch branch = store.branch();
+        NodeState root = branch.getHead();
+        NodeBuilder builder = root.builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, true);
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndexSecond", true, false, ImmutableSet.of("bar"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, true);
+
+        builder.child("testRoot").setProperty("foo", "abc")
+                .setProperty("bar", "def");
+        builder.child("testSecond").setProperty("bar", "ghi");
+
+        // merge it back in
+        branch.setRoot(builder.getNodeState());
+        branch.merge(EmptyHook.INSTANCE);
+
+        AsyncIndexUpdate async = new AsyncIndexUpdate(store, executor, provider);
+        runIndexing(async, 1);
+        root = store.getRoot();
+
+        // first check that the index content nodes exist
+        checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndex",
+                INDEX_CONTENT_NODE_NAME);
+        checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndexSecond",
+                INDEX_CONTENT_NODE_NAME);
+
+        PropertyIndexLookup lookup = new PropertyIndexLookup(root);
+        assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
+        assertEquals(ImmutableSet.of(), find(lookup, "foo", "def"));
+        assertEquals(ImmutableSet.of(), find(lookup, "foo", "ghi"));
+
+        assertEquals(ImmutableSet.of(), find(lookup, "bar", "abc"));
+        assertEquals(ImmutableSet.of("testRoot"), find(lookup, "bar", "def"));
+        assertEquals(ImmutableSet.of("testSecond"), find(lookup, "bar", "ghi"));
+
+    }
+
+    /**
+     * Async Index Test with 2 index defs at different tree locations
+     * <ul>
+     * <li>Add an index definition</li>
+     * <li>Add some content</li>
+     * <li>Search & verify</li>
+     * </ul>
+     * 
+     */
+    @Test
+    public void testAsyncDoubleSubtree() throws Exception {
+        NodeStore store = new MemoryNodeStore();
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+        NodeStoreBranch branch = store.branch();
+        NodeState root = branch.getHead();
+        NodeBuilder builder = root.builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, true);
+        createIndexDefinition(
+                builder.child("newchild").child("other")
+                        .child(INDEX_DEFINITIONS_NAME), "subIndex", true,
+                false, ImmutableSet.of("foo"), null).setProperty(
+                ASYNC_PROPERTY_NAME, true);
+
+        builder.child("testRoot").setProperty("foo", "abc");
+        builder.child("newchild").child("other").child("testChild")
+                .setProperty("foo", "xyz");
+
+        // merge it back in
+        branch.setRoot(builder.getNodeState());
+        branch.merge(EmptyHook.INSTANCE);
+
+        AsyncIndexUpdate async = new AsyncIndexUpdate(store, executor, provider);
+        runIndexing(async, 1);
+        root = store.getRoot();
+
+        // first check that the index content nodes exist
+        checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndex",
+                INDEX_CONTENT_NODE_NAME);
+        checkPathExists(root, "newchild", "other", INDEX_DEFINITIONS_NAME,
+                "subIndex", INDEX_CONTENT_NODE_NAME);
+
+        PropertyIndexLookup lookup = new PropertyIndexLookup(root);
+        assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
+
+        PropertyIndexLookup lookupChild = new PropertyIndexLookup(root
+                .getChildNode("newchild").getChildNode("other"));
+        assertEquals(ImmutableSet.of("testChild"),
+                find(lookupChild, "foo", "xyz"));
+        assertEquals(ImmutableSet.of(), find(lookupChild, "foo", "abc"));
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain