You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2016/09/15 07:14:06 UTC

svn commit: r1760831 [1/2] - in /jackrabbit/oak/trunk/oak-lucene/src: main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ test/java/org/apache/jackrabbit/oak/plugins/index/lucene/ t...

Author: chetanm
Date: Thu Sep 15 07:14:06 2016
New Revision: 1760831

URL: http://svn.apache.org/viewvc?rev=1760831&view=rev
Log:
OAK-4412 - Lucene hybrid index

Base implementation for the hybrid index. Main parts working with some todo and OSGi integration pending

Added:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactoryTest.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinitionTest.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java Thu Sep 15 07:14:06 2016
@@ -231,6 +231,8 @@ public final class IndexDefinition imple
 
     private final String indexPath;
 
+    private final boolean sync;
+
     @Nullable
     private final String uid;
 
@@ -303,6 +305,7 @@ public final class IndexDefinition imple
         this.secureFacets = defn.hasChildNode(FACETS) && getOptionalValue(defn.getChildNode(FACETS), PROP_SECURE_FACETS, true);
         this.suggestEnabled = evaluateSuggestionEnabled();
         this.spellcheckEnabled = evaluateSpellcheckEnabled();
+        this.sync = determineSync(defn);
     }
 
     public NodeState getDefinitionNodeState() {
@@ -433,6 +436,10 @@ public final class IndexDefinition imple
         return uid;
     }
 
+    public boolean isSync() {
+        return sync;
+    }
+
     @Override
     public String toString() {
         return "Lucene Index : " + indexName;
@@ -1565,4 +1572,10 @@ public final class IndexDefinition imple
         return version == IndexFormatVersion.V1 ?  1.5 : 1.0;
     }
 
+    private static boolean determineSync(NodeState defn) {
+        Iterable<String> async = defn.getStrings(IndexConstants.ASYNC_PROPERTY_NAME);
+        //TODO [hybrid] make it a constant
+        return Iterables.contains(async, "sync");
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java Thu Sep 15 07:14:06 2016
@@ -20,15 +20,23 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
-import com.google.common.base.Preconditions;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.NRTIndex;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.NRTIndexFactory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReader;
 import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReaderFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.MultiReader;
@@ -36,18 +44,26 @@ import org.apache.lucene.search.IndexSea
 import org.apache.lucene.search.suggest.analyzing.AnalyzingInfixSuggester;
 import org.apache.lucene.store.Directory;
 
-class IndexNode {
+public class IndexNode {
 
-    static IndexNode open(String indexPath, NodeState root, NodeState defnNodeState, LuceneIndexReaderFactory readerFactory)
+    static IndexNode open(String indexPath, NodeState root, NodeState defnNodeState,
+                          LuceneIndexReaderFactory readerFactory, @Nullable NRTIndexFactory nrtFactory)
             throws IOException {
         IndexDefinition definition = new IndexDefinition(root, defnNodeState);
         List<LuceneIndexReader> readers = readerFactory.createReaders(definition, defnNodeState, indexPath);
+        NRTIndex nrtIndex = nrtFactory != null ? nrtFactory.createIndex(definition) : null;
         if (!readers.isEmpty()){
-            return new IndexNode(PathUtils.getName(indexPath), definition, readers);
+            return new IndexNode(PathUtils.getName(indexPath), definition, readers, nrtIndex);
         }
         return null;
     }
 
+    /**
+     * Time interval after which readers would be refreshed in case of real time index
+     * TODO Make this configurable
+     */
+    private final long refreshDelta = TimeUnit.SECONDS.toMillis(1);
+
     private final List<LuceneIndexReader> readers;
 
     private final String name;
@@ -56,17 +72,22 @@ class IndexNode {
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private final IndexSearcher indexSearcher;
+    private volatile IndexSearcher indexSearcher;
+
+    private final NRTIndex nrtIndex;
 
     private boolean closed = false;
 
-    IndexNode(String name, IndexDefinition definition, List<LuceneIndexReader> readers)
+    private long lastRefreshTime;
+
+    IndexNode(String name, IndexDefinition definition, List<LuceneIndexReader> readers, @Nullable NRTIndex nrtIndex)
             throws IOException {
         checkArgument(!readers.isEmpty());
         this.name = name;
         this.definition = definition;
         this.readers = readers;
-        this.indexSearcher = new IndexSearcher(createReader(readers));
+        this.nrtIndex = nrtIndex;
+        this.indexSearcher = new IndexSearcher(createReader());
     }
 
     String getName() {
@@ -77,7 +98,7 @@ class IndexNode {
         return definition;
     }
 
-    IndexSearcher getSearcher() {
+    public IndexSearcher getSearcher() {
         return indexSearcher;
     }
 
@@ -99,7 +120,7 @@ class IndexNode {
         }
     }
 
-    void release() {
+    public void release() {
         lock.readLock().unlock();
     }
 
@@ -112,9 +133,32 @@ class IndexNode {
             lock.writeLock().unlock();
         }
 
-       for (LuceneIndexReader reader : readers){
+        //Do not close the NRTIndex here as it might be in use
+        //by newer IndexNode. Just close the readers obtained from
+        //them
+        for (LuceneIndexReader reader : Iterables.concat(readers, getNRTReaders())){
            reader.close();
-       }
+        }
+    }
+
+    @CheckForNull
+    public LuceneIndexWriter getLocalWriter() throws IOException{
+        return nrtIndex != null ? nrtIndex.getWriter() : null;
+    }
+
+    public void refreshReaders(long currentTime) {
+        //TODO [hybrid] Refreshing currently requires updates to happen
+        //However if no update happened after last update the refresh would not
+        //happen and result would remain stale upto next async cycle. Possibly
+        //introduce a refresh policy
+        if (currentTime - lastRefreshTime > refreshDelta){
+            lastRefreshTime = currentTime;
+            indexSearcher = new IndexSearcher(createReader());
+        }
+    }
+
+    public long getRefreshDelta() {
+        return refreshDelta;
     }
 
     private LuceneIndexReader getDefaultReader(){
@@ -122,14 +166,22 @@ class IndexNode {
         return readers.get(0);
     }
 
-    private IndexReader createReader(List<LuceneIndexReader> readers) {
-        if (readers.size() == 1){
+    private IndexReader createReader() {
+        List<LuceneIndexReader> nrtReaders = getNRTReaders();
+        if (readers.size() == 1 && nrtReaders.isEmpty()){
             return readers.get(0).getReader();
         }
-        IndexReader[] readerArr = new IndexReader[readers.size()];
-        for (int i = 0; i < readerArr.length; i++) {
-            readerArr[i] = readers.get(i).getReader();
+        IndexReader[] readerArr = new IndexReader[readers.size() + nrtReaders.size()];
+        int i = 0;
+        for (LuceneIndexReader r : Iterables.concat(readers, nrtReaders)){
+            readerArr[i++] = r.getReader();
         }
         return new MultiReader(readerArr, true);
     }
+
+    private List<LuceneIndexReader> getNRTReaders() {
+        return nrtIndex != null ? nrtIndex.getReaders() : Collections.<LuceneIndexReader>emptyList();
+    }
+
+
 }

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java Thu Sep 15 07:14:06 2016
@@ -34,7 +34,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import javax.annotation.Nullable;
+
 import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.NRTIndexFactory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReaderFactory;
 import org.apache.jackrabbit.oak.spi.commit.CompositeEditor;
@@ -51,7 +54,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
-class IndexTracker {
+public class IndexTracker {
 
     /** Logger instance. */
     private static final Logger log = LoggerFactory.getLogger(IndexTracker.class);
@@ -59,6 +62,7 @@ class IndexTracker {
             new PerfLogger(LoggerFactory.getLogger(IndexTracker.class.getName() + ".perf"));
 
     private final LuceneIndexReaderFactory readerFactory;
+    private final NRTIndexFactory nrtFactory;
 
     private NodeState root = EMPTY_NODE;
 
@@ -66,7 +70,7 @@ class IndexTracker {
 
     private volatile boolean refresh;
 
-    IndexTracker() {
+    public IndexTracker() {
         this((IndexCopier)null);
     }
 
@@ -74,8 +78,13 @@ class IndexTracker {
         this(new DefaultIndexReaderFactory(Mounts.defaultMountInfoProvider(), cloner));
     }
 
-    IndexTracker(LuceneIndexReaderFactory readerFactory){
+    IndexTracker(LuceneIndexReaderFactory readerFactory) {
+        this(readerFactory, null);
+    }
+
+    public IndexTracker(LuceneIndexReaderFactory readerFactory, @Nullable NRTIndexFactory nrtFactory){
         this.readerFactory = readerFactory;
+        this.nrtFactory = nrtFactory;
     }
 
     synchronized void close() {
@@ -91,7 +100,7 @@ class IndexTracker {
         }
     }
 
-    synchronized void update(final NodeState root) {
+    public synchronized void update(final NodeState root) {
         if (refresh) {
             this.root = root;
             close();
@@ -115,7 +124,7 @@ class IndexTracker {
                 public void leave(NodeState before, NodeState after) {
                     try {
                         long start = PERF_LOGGER.start();
-                        IndexNode index = IndexNode.open(path, root, after, readerFactory);
+                        IndexNode index = IndexNode.open(path, root, after, readerFactory, nrtFactory);
                         PERF_LOGGER.end(start, -1, "[{}] Index found to be updated. Reopening the IndexNode", path);
                         updates.put(path, index); // index can be null
                     } catch (IOException e) {
@@ -153,7 +162,7 @@ class IndexTracker {
         refresh = true;
     }
 
-    IndexNode acquireIndexNode(String path) {
+    public IndexNode acquireIndexNode(String path) {
         IndexNode index = indices.get(path);
         if (index != null && index.acquire()) {
             return index;
@@ -183,7 +192,7 @@ class IndexTracker {
 
         try {
             if (isLuceneIndexNode(node)) {
-                index = IndexNode.open(path, root, node, readerFactory);
+                index = IndexNode.open(path, root, node, readerFactory, nrtFactory);
                 if (index != null) {
                     checkState(index.acquire());
                     indices = ImmutableMap.<String, IndexNode>builder()

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java Thu Sep 15 07:14:06 2016
@@ -19,10 +19,17 @@ package org.apache.jackrabbit.oak.plugin
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.index.ContextAwareCallback;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditor;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
+import org.apache.jackrabbit.oak.plugins.index.IndexingContext;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LocalIndexWriterFactory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterFactory;
 import org.apache.jackrabbit.oak.spi.commit.Editor;
@@ -31,6 +38,7 @@ import org.apache.jackrabbit.oak.spi.mou
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
 
@@ -66,7 +74,7 @@ public class LuceneIndexEditorProvider i
                                      @Nullable IndexAugmentorFactory augmentorFactory,
                                      MountInfoProvider mountInfoProvider) {
         this.indexCopier = indexCopier;
-        this.extractedTextCache = checkNotNull(extractedTextCache);
+        this.extractedTextCache = extractedTextCache != null ? extractedTextCache : new ExtractedTextCache(0, 0);
         this.augmentorFactory = augmentorFactory;
         this.indexWriterFactory = new DefaultIndexWriterFactory(checkNotNull(mountInfoProvider), indexCopier);
     }
@@ -77,8 +85,25 @@ public class LuceneIndexEditorProvider i
             @Nonnull IndexUpdateCallback callback)
             throws CommitFailedException {
         if (TYPE_LUCENE.equals(type)) {
+            checkArgument(callback instanceof ContextAwareCallback, "callback instance not of type " +
+                    "ContextAwareCallback [%s]", callback);
+            IndexingContext indexingContext = ((ContextAwareCallback)callback).getIndexingContext();
+            LuceneIndexWriterFactory writerFactory = indexWriterFactory;
+            if (!indexingContext.isAsync() && supportsSyncIndexing(definition)) {
+
+                //Would not participate in reindexing. Only interested in
+                //incremental indexing
+                if (indexingContext.isReindexing()){
+                    return null;
+                }
+                //TODO [hybrid] switch the builder to readonly one
+                //TODO [hybrid] Make use of existing IndexDefinition to avoid reinit for
+                //every commit
+                writerFactory = new LocalIndexWriterFactory(indexingContext);
+            }
+
             LuceneIndexEditorContext context = new LuceneIndexEditorContext(root, definition, callback,
-                    indexWriterFactory, extractedTextCache, augmentorFactory);
+                    writerFactory, extractedTextCache, augmentorFactory);
             return new LuceneIndexEditor(context);
         }
         return null;
@@ -91,4 +116,13 @@ public class LuceneIndexEditorProvider i
     ExtractedTextCache getExtractedTextCache() {
         return extractedTextCache;
     }
+
+    private boolean supportsSyncIndexing(NodeBuilder defn){
+        //TODO [hybrid] Similar logic exists in IndexDefinition. Should be unified
+        PropertyState async = defn.getProperty(IndexConstants.ASYNC_PROPERTY_NAME);
+        if (async == null){
+            return false;
+        }
+        return Iterables.contains(async.getValue(Type.STRINGS), "sync");
+    }
 }

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexNode;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.apache.lucene.index.IndexableField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkState;
+
+class DocumentQueue implements Closeable{
+    private static final LuceneDoc STOP = LuceneDoc.forUpdate("", "", Collections.<IndexableField>emptyList());
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final IndexTracker tracker;
+    private final BlockingQueue<LuceneDoc> docsQueue;
+    private final Clock clock;
+    private final Executor executor;
+    private volatile boolean stopped;
+
+    /**
+     * Handler for uncaught exception on the background thread
+     */
+    private final UncaughtExceptionHandler exceptionHandler = new UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            log.error("Uncaught exception", e);
+        }
+    };
+
+    /**
+     * Current background task
+     */
+    private volatile NotifyingFutureTask currentTask = NotifyingFutureTask.completed();
+
+    /**
+     * Completion handler: set the current task to the next task and schedules that one
+     * on the background thread.
+     */
+    private final Runnable completionHandler = new Runnable() {
+        private final Callable<Void> task = new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                try {
+                    LuceneDoc doc = docsQueue.poll();
+                    if (doc != null && doc != STOP) {
+                        processDoc(doc);
+                        currentTask.onComplete(completionHandler);
+                    }
+                } catch (Throwable t) {
+                    exceptionHandler.uncaughtException(Thread.currentThread(), t);
+                }
+                return null;
+            }
+        };
+
+        @Override
+        public void run() {
+            currentTask = new NotifyingFutureTask(task);
+            executor.execute(currentTask);
+        }
+    };
+
+    DocumentQueue(int maxQueueSize, IndexTracker tracker, Clock clock, Executor executor) {
+        this.docsQueue = new LinkedBlockingDeque<>(maxQueueSize);
+        this.tracker = tracker;
+        this.clock = clock;
+        this.executor = executor;
+    }
+
+    public boolean add(LuceneDoc doc){
+        checkState(!stopped);
+        boolean added = docsQueue.offer(doc);
+        // Set the completion handler on the currently running task. Multiple calls
+        // to onComplete are not a problem here since we always pass the same value.
+        // Thus there is no question as to which of the handlers will effectively run.
+        currentTask.onComplete(completionHandler);
+        //TODO log warning when queue is full
+        return added;
+    }
+
+    List<LuceneDoc> getQueuedDocs(){
+        List<LuceneDoc> docs = Lists.newArrayList();
+        docs.addAll(docsQueue);
+        return docs;
+    }
+
+    private void processDoc(LuceneDoc doc){
+        IndexNode indexNode = tracker.acquireIndexNode(doc.indexPath);
+        if (indexNode == null) {
+            log.debug("No IndexNode found for index [{}]. Skipping index entry for [{}]", doc.indexPath, doc.docPath);
+            return;
+        }
+
+        try{
+            LuceneIndexWriter writer = indexNode.getLocalWriter();
+
+            if (writer == null){
+                //IndexDefinition per IndexNode might have changed and local
+                //indexing is disabled. Ignore
+                log.debug("No local IndexWriter found for index [{}]. Skipping index " +
+                                "entry for [{}]", doc.indexPath, doc.docPath);
+                return;
+            }
+            if (doc.delete) {
+                writer.deleteDocuments(doc.docPath);
+            } else {
+                writer.updateDocument(doc.docPath, doc.doc);
+            }
+            //TODO Support for immediate refresh
+            indexNode.refreshReaders(clock.getTime());
+        } catch (Exception e) {
+            //For now we just log it. Later we need to see if frequent error then to
+            //temporarily disable indexing for this index
+            log.warn("Error occurred while indexing node [{}] for index [{}]",doc.docPath, doc.indexPath, e);
+        } finally {
+            indexNode.release();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        //Its fine to "drop" any entry in queue as
+        //local index is meant for running state only
+        docsQueue.clear();
+        docsQueue.add(STOP);
+        stopped = true;
+
+        //TODO Should we wait for STOP to be processed
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+public class LocalIndexObserver implements Observer{
+    private final DocumentQueue docQueue;
+
+    public LocalIndexObserver(DocumentQueue docQueue) {
+        this.docQueue = docQueue;
+    }
+
+    @Override
+    public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+        //TODO [hybrid] Do external diff?
+        if (info == null){
+           return;
+        }
+
+        CommitContext commitContext = (CommitContext) info.getInfo().get(CommitContext.NAME);
+        //Commit done internally i.e. one not using Root/Tree API
+        if (commitContext == null){
+            return;
+        }
+
+        LuceneDocumentHolder holder = (LuceneDocumentHolder) commitContext.get(LuceneDocumentHolder.NAME);
+        //Nothing to be indexed
+        if (holder == null){
+            return;
+        }
+
+        for (LuceneDoc doc : holder.getAsyncIndexedDocs()){
+            docQueue.add(doc);
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import org.apache.jackrabbit.oak.plugins.index.IndexingContext;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterFactory;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.index.IndexableField;
+
+public class LocalIndexWriterFactory implements LuceneIndexWriterFactory {
+    public static final String COMMIT_PROCESSED_BY_LOCAL_LUCENE_EDITOR = "commitProcessedByLocalLuceneEditor";
+    private final IndexingContext indexingContext;
+    private final CommitContext commitContext;
+
+    public LocalIndexWriterFactory(IndexingContext indexingContext) {
+        this.indexingContext = indexingContext;
+        this.commitContext = getCommitContext(indexingContext);
+    }
+
+    private LuceneDocumentHolder getDocumentHolder(){
+        LuceneDocumentHolder holder = (LuceneDocumentHolder) commitContext.get(LuceneDocumentHolder.NAME);
+        if (holder == null) {
+            //lazily initialize the holder
+            holder = new LuceneDocumentHolder();
+            commitContext.set(LuceneDocumentHolder.NAME, holder);
+        }
+        return holder;
+    }
+
+    private static CommitContext getCommitContext(IndexingContext indexingContext) {
+        CommitContext commitContext = (CommitContext) indexingContext.getCommitInfo().getInfo().get(CommitContext.NAME);
+        return Preconditions.checkNotNull(commitContext, "No commit context found in commit info");
+    }
+
+    @Override
+    public LuceneIndexWriter newInstance(IndexDefinition definition, NodeBuilder definitionBuilder, boolean reindex) {
+        return new LocalIndexWriter(definition);
+    }
+
+    private class LocalIndexWriter implements LuceneIndexWriter {
+        private final IndexDefinition definition;
+        private List<LuceneDoc> docList;
+
+        public LocalIndexWriter(IndexDefinition definition) {
+            this.definition = definition;
+        }
+
+        @Override
+        public void updateDocument(String path, Iterable<? extends IndexableField> doc) throws IOException {
+            addLuceneDoc(LuceneDoc.forUpdate(definition.getIndexPathFromConfig(), path, doc));
+        }
+
+        @Override
+        public void deleteDocuments(String path) throws IOException {
+            addLuceneDoc(LuceneDoc.forDelete(definition.getIndexPathFromConfig(), path));
+        }
+
+        @Override
+        public boolean close(long timestamp) throws IOException {
+            //This is used by testcase
+            commitContext.set(COMMIT_PROCESSED_BY_LOCAL_LUCENE_EDITOR, Boolean.TRUE);
+            //always return false as nothing gets written to the index
+            return false;
+        }
+
+        private void addLuceneDoc(LuceneDoc luceneDoc) {
+            if (docList == null){
+                docList = getDocumentHolder().getAsyncIndexedDocList(indexingContext.getIndexPath());
+            }
+            //TODO [hybrid] checks about the size. If too many drop
+            //However for truly sync case hold on
+            docList.add(luceneDoc);
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import javax.annotation.Nullable;
+
+import org.apache.lucene.index.IndexableField;
+
+class LuceneDoc {
+    final String indexPath;
+    final String docPath;
+    final Iterable<? extends IndexableField> doc;
+    final boolean delete;
+
+    public static LuceneDoc forUpdate(String indexPath, String path, Iterable<? extends IndexableField> doc){
+        return new LuceneDoc(indexPath, path, doc, false);
+    }
+
+    public static LuceneDoc forDelete(String indexPath, String path){
+        return new LuceneDoc(indexPath, path, null, true);
+    }
+
+    private LuceneDoc(String indexPath, String path, @Nullable Iterable<? extends IndexableField> doc, boolean delete) {
+        this.docPath = path;
+        this.indexPath = indexPath;
+        this.doc = doc;
+        this.delete = delete;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s(%s)", indexPath, docPath);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.util.List;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+
+class LuceneDocumentHolder {
+    public static final String NAME = "oak.lucene.documentHolder";
+
+    private final ListMultimap<String, LuceneDoc> asyncList = ArrayListMultimap.create();
+
+    public List<LuceneDoc> getAsyncIndexedDocList(String indexPath) {
+        return asyncList.get(indexPath);
+    }
+
+    public Iterable<LuceneDoc> getAsyncIndexedDocs(){
+        return asyncList.values();
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReader;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.IndexWriterUtils;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.search.suggest.analyzing.AnalyzingInfixSuggester;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.NRTCachingDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkState;
+
+
+public class NRTIndex implements Closeable {
+    private static final AtomicInteger COUNTER = new AtomicInteger();
+    private static final Logger log = LoggerFactory.getLogger(NRTIndex.class);
+
+    /**
+     * Prefix used for naming the directory created for NRT indexes
+     */
+    private static final String NRT_DIR_PREFIX = "nrt-";
+
+    private final IndexDefinition definition;
+    private final IndexCopier indexCopier;
+    private final LuceneIndexReader previousReader;
+
+    private IndexWriter indexWriter;
+    private NRTIndexWriter nrtIndexWriter;
+    private File indexDir;
+    private Directory directory;
+    private boolean closed;
+
+    public NRTIndex(IndexDefinition definition, IndexCopier indexCopier, @Nullable NRTIndex previous) {
+        this.definition = definition;
+        this.indexCopier = indexCopier;
+        this.previousReader = previous != null ? previous.getPrimaryReader() : null;
+    }
+
+    @CheckForNull
+    LuceneIndexReader getPrimaryReader() {
+        return createReader();
+    }
+
+    public LuceneIndexWriter getWriter() throws IOException {
+        checkState(!closed);
+        if (nrtIndexWriter == null) {
+            nrtIndexWriter = createWriter();
+        }
+        return nrtIndexWriter;
+    }
+
+    public List<LuceneIndexReader> getReaders() {
+        checkState(!closed);
+        List<LuceneIndexReader> readers = Lists.newArrayListWithCapacity(2);
+        LuceneIndexReader latestReader = createReader();
+        if (latestReader != null) {
+            readers.add(latestReader);
+        }
+
+        //Old reader should be added later
+        if (previousReader != null) {
+            readers.add(previousReader);
+        }
+        return readers;
+    }
+
+    public void close() throws IOException {
+        if (closed) {
+            return;
+        }
+
+        if (indexWriter != null) {
+            indexWriter.close();
+            directory.close();
+            FileUtils.deleteQuietly(indexDir);
+            log.debug("[{}] Removed directory [{}]", this, indexDir);
+        }
+
+        closed = true;
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    @Override
+    public String toString() {
+        return definition.getIndexPathFromConfig();
+    }
+
+    //For test
+    File getIndexDir() {
+        return indexDir;
+    }
+
+    @CheckForNull
+    private LuceneIndexReader createReader() {
+        checkState(!closed);
+        //Its possible that readers are obtained
+        //before anything gets indexed
+        if (indexWriter == null) {
+            return null;
+        }
+        try {
+            //applyDeletes is false as layers above would take care of
+            //stale result
+            return new NRTReader(DirectoryReader.open(indexWriter, false));
+        } catch (IOException e) {
+            log.warn("Error opening index [{}]", e);
+        }
+        return null;
+    }
+
+    private synchronized NRTIndexWriter createWriter() throws IOException {
+        long uniqueCount = System.currentTimeMillis() + COUNTER.incrementAndGet();
+        String dirName = NRT_DIR_PREFIX + uniqueCount;
+        indexDir = indexCopier.getIndexDir(definition, definition.getIndexPathFromConfig(), dirName);
+        Directory fsdir = FSDirectory.open(indexDir);
+        //TODO make these configurable
+        directory = new NRTCachingDirectory(fsdir, 1, 1);
+        IndexWriterConfig config = IndexWriterUtils.getIndexWriterConfig(definition, false);
+        indexWriter = new IndexWriter(directory, config);
+        return new NRTIndexWriter(indexWriter);
+    }
+
+    private static class NRTReader implements LuceneIndexReader {
+        private final IndexReader indexReader;
+
+        public NRTReader(IndexReader indexReader) {
+            this.indexReader = indexReader;
+        }
+
+        @Override
+        public IndexReader getReader() {
+            return indexReader;
+        }
+
+        @Override
+        public AnalyzingInfixSuggester getLookup() {
+            return null;
+        }
+
+        @Override
+        public Directory getSuggestDirectory() {
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+    }
+
+    private static class NRTIndexWriter implements LuceneIndexWriter {
+        private final IndexWriter indexWriter;
+
+        public NRTIndexWriter(IndexWriter indexWriter) {
+            this.indexWriter = indexWriter;
+        }
+
+        @Override
+        public void updateDocument(String path, Iterable<? extends IndexableField> doc) throws IOException {
+            //For NRT case documents are never updated
+            //instead they are just added. This would cause duplicates
+            //That should be taken care at query side via unique cursor
+            indexWriter.addDocument(doc);
+        }
+
+        @Override
+        public void deleteDocuments(String path) throws IOException {
+            //Do not delete documents. Query side would handle it
+        }
+
+        @Override
+        public boolean close(long timestamp) throws IOException {
+            throw new IllegalStateException("Close should not be called");
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import javax.annotation.CheckForNull;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class NRTIndexFactory implements Closeable{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ListMultimap<String, NRTIndex> indexes = LinkedListMultimap.create();
+    private final IndexCopier indexCopier;
+
+    public NRTIndexFactory(IndexCopier indexCopier) {
+        this.indexCopier = checkNotNull(indexCopier);
+    }
+
+    //This would not be invoked concurrently
+    // but still mark it synchronized for safety
+    @CheckForNull
+    public synchronized NRTIndex createIndex(IndexDefinition definition) {
+        if (!definition.isSync()){
+            return null;
+        }
+        String indexPath = definition.getIndexPathFromConfig();
+        NRTIndex current = new NRTIndex(definition, indexCopier, getPrevious(indexPath));
+        indexes.put(indexPath, current);
+        closeLast(indexPath);
+        return current;
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (NRTIndex index : indexes.values()){
+            index.close();
+        }
+        indexes.clear();
+    }
+
+    List<NRTIndex> getIndexes(String path){
+        return indexes.get(path);
+    }
+
+    private void closeLast(String indexPath) {
+        List<NRTIndex> existing = indexes.get(indexPath);
+        if (existing.size() < 3){
+            return;
+        }
+        NRTIndex oldest = existing.remove(0);
+        try {
+            oldest.close();
+        } catch (IOException e) {
+            log.warn("Error occurred while closing index [{}]", oldest, e);
+        }
+    }
+
+    private NRTIndex getPrevious(String indexPath) {
+        List<NRTIndex> existing = indexes.get(indexPath);
+        if (existing.isEmpty()){
+            return null;
+        }
+        checkArgument(existing.size() <= 2, "Found [%s] more than 3 index", existing.size());
+        return existing.get(existing.size() - 1);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinitionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinitionTest.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinitionTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinitionTest.java Thu Sep 15 07:14:06 2016
@@ -77,6 +77,7 @@ public class IndexDefinitionTest {
     public void defaultConfig() throws Exception{
         IndexDefinition idxDefn = new IndexDefinition(root, builder.getNodeState());
         assertTrue(idxDefn.saveDirListing());
+        assertFalse(idxDefn.isSync());
     }
 
     @Test
@@ -866,6 +867,13 @@ public class IndexDefinitionTest {
 
     }
 
+    @Test
+    public void sync() throws Exception{
+        builder.setProperty(createProperty(IndexConstants.ASYNC_PROPERTY_NAME, of("sync" , "async"), STRINGS));
+        IndexDefinition idxDefn = new IndexDefinition(root, builder.getNodeState());
+        assertTrue(idxDefn.isSync());
+    }
+
     //TODO indexesAllNodesOfMatchingType - with nullCheckEnabled
 
     private static IndexingRule getRule(IndexDefinition defn, String typeName){

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java Thu Sep 15 07:14:06 2016
@@ -768,11 +768,11 @@ public class IndexPlannerTest {
     //------ END - Suggestion/spellcheck plan tests
 
     private IndexNode createIndexNode(IndexDefinition defn, long numOfDocs) throws IOException {
-        return new IndexNode("foo", defn, new TestReaderFactory(createSampleDirectory(numOfDocs)).createReaders(defn, EMPTY_NODE, "foo"));
+        return new IndexNode("foo", defn, new TestReaderFactory(createSampleDirectory(numOfDocs)).createReaders(defn, EMPTY_NODE, "foo"), null);
     }
 
     private IndexNode createIndexNode(IndexDefinition defn) throws IOException {
-        return new IndexNode("foo", defn, new TestReaderFactory(createSampleDirectory()).createReaders(defn, EMPTY_NODE, "foo"));
+        return new IndexNode("foo", defn, new TestReaderFactory(createSampleDirectory()).createReaders(defn, EMPTY_NODE, "foo"), null);
     }
 
     private FilterImpl createFilter(String nodeTypeName) {

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java Thu Sep 15 07:14:06 2016
@@ -2422,7 +2422,7 @@ public class LucenePropertyIndexTest ext
         return createIndex(index, name, propNames);
     }
 
-    static Tree createIndex(Tree index, String name, Set<String> propNames) throws CommitFailedException {
+    public static Tree createIndex(Tree index, String name, Set<String> propNames) throws CommitFailedException {
         Tree def = index.addChild(INDEX_DEFINITIONS_NAME).addChild(name);
         def.setProperty(JcrConstants.JCR_PRIMARYTYPE,
                 INDEX_DEFINITIONS_NODE_TYPE, Type.NAME);

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java Thu Sep 15 07:14:06 2016
@@ -140,7 +140,7 @@ public class MultiplexingLucenePropertyI
         LuceneIndexReaderFactory readerFactory = new DefaultIndexReaderFactory(mip, null);
         List<LuceneIndexReader> readers = readerFactory.createReaders(defn, builder.getNodeState(),"/foo");
 
-        IndexNode node = new IndexNode("foo", defn, readers);
+        IndexNode node = new IndexNode("foo", defn, readers, null);
 
         //3 Obtain the plan
         FilterImpl filter = createFilter("nt:base");

Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.core.SimpleCommitContext;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexNode;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static org.apache.jackrabbit.oak.api.Type.STRINGS;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.FieldFactory.newPathField;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LocalIndexObserverTest.NOOP_EXECUTOR;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.util.LuceneIndexHelper.newLucenePropertyIndexDefinition;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty;
+import static org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
+import static org.apache.jackrabbit.oak.spi.mount.Mounts.defaultMountInfoProvider;
+import static org.junit.Assert.*;
+
+public class DocumentQueueTest {
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));
+
+    private NodeState root = INITIAL_CONTENT;
+    private NodeBuilder builder = root.builder();
+    private EditorHook asyncHook;
+    private EditorHook syncHook;
+    private CommitInfo info;
+
+    private IndexTracker tracker = new IndexTracker();
+    private NRTIndexFactory indexFactory;
+
+    @Before
+    public void setUp() throws IOException {
+        IndexEditorProvider editorProvider = new LuceneIndexEditorProvider(
+                null,
+                null,
+                null,
+                defaultMountInfoProvider()
+        );
+
+        syncHook = new EditorHook(new IndexUpdateProvider(editorProvider));
+        asyncHook = new EditorHook(new IndexUpdateProvider(editorProvider, "async", false));
+    }
+
+    @Test
+    public void dropDocOnLimit() throws Exception{
+        DocumentQueue queue = new DocumentQueue(2, tracker, Clock.SIMPLE, NOOP_EXECUTOR);
+        assertTrue(queue.add(LuceneDoc.forDelete("foo", "bar")));
+        assertTrue(queue.add(LuceneDoc.forDelete("foo", "bar")));
+
+        //3rd one would be dropped as queue size is 2
+        assertFalse(queue.add(LuceneDoc.forDelete("foo", "bar")));
+    }
+
+    @Test
+    public void noIssueIfNoIndex() throws Exception{
+        DocumentQueue queue = new DocumentQueue(2, tracker, Clock.SIMPLE, sameThreadExecutor());
+        assertTrue(queue.add(LuceneDoc.forDelete("foo", "bar")));
+        assertTrue(queue.getQueuedDocs().isEmpty());
+    }
+
+    @Test
+    public void closeQueue() throws Exception{
+        DocumentQueue queue = new DocumentQueue(2, tracker, Clock.SIMPLE, sameThreadExecutor());
+        queue.close();
+
+        try {
+            queue.add(LuceneDoc.forDelete("foo", "bar"));
+            fail();
+        } catch(IllegalStateException ignore){
+
+        }
+    }
+
+    @Test
+    public void noIssueIfNoWriter() throws Exception{
+        NodeState indexed = createAndPopulateAsyncIndex();
+        DocumentQueue queue = new DocumentQueue(2, tracker, Clock.SIMPLE, sameThreadExecutor());
+
+        tracker.update(indexed);
+        assertTrue(queue.add(LuceneDoc.forDelete("/oak:index/fooIndex", "bar")));
+    }
+
+    @Test
+    public void updateDocument() throws Exception{
+        IndexTracker tracker = createTracker();
+        NodeState indexed = createAndPopulateAsyncIndex();
+        tracker.update(indexed);
+        DocumentQueue queue = new DocumentQueue(2, tracker, Clock.SIMPLE, sameThreadExecutor());
+
+        Document d1 = new Document();
+        d1.add(newPathField("/a/b"));
+        d1.add(new StringField("foo", "a", Field.Store.NO));
+        queue.add(LuceneDoc.forUpdate("/oak:index/fooIndex", "/a/b", d1));
+
+        List<NRTIndex> indexes = indexFactory.getIndexes("/oak:index/fooIndex");
+        NRTIndex index = indexes.get(indexes.size() - 1);
+        assertEquals(1, index.getPrimaryReader().getReader().numDocs());
+    }
+
+    @Test
+    public void indexRefresh() throws Exception{
+        IndexTracker tracker = createTracker();
+        NodeState indexed = createAndPopulateAsyncIndex();
+        tracker.update(indexed);
+
+        Clock clock = new Clock.Virtual();
+        clock.waitUntil(System.currentTimeMillis());
+
+        DocumentQueue queue = new DocumentQueue(2, tracker, clock, sameThreadExecutor());
+
+        IndexNode indexNode = tracker.acquireIndexNode("/oak:index/fooIndex");
+        TopDocs td = doSearch(indexNode, "bar");
+        assertEquals(1, td.totalHits);
+
+        addDoc(queue, "/a/b", "bar");
+
+        //First update would be picked as base time was zero which would now
+        //get initialized
+        td = doSearch(indexNode, "bar");
+        assertEquals(2, td.totalHits);
+
+        addDoc(queue, "/a/c", "bar");
+
+        //Now it would not update as refresh interval has not exceeded
+        td = doSearch(indexNode, "bar");
+        assertEquals(2, td.totalHits);
+
+        //Get past the delta time
+        clock.waitUntil(clock.getTime() + indexNode.getRefreshDelta() + 1);
+
+        addDoc(queue, "/a/d", "bar");
+
+        //Now it should show updated result
+        td = doSearch(indexNode, "bar");
+        assertEquals(4, td.totalHits);
+
+        //Phase 2 - Check affect of async index update cycle
+        //With that there should only be 2 copies of NRTIndex kept
+        indexed = doAsyncIndex(indexed, "a2", "bar");
+
+        indexNode.release();
+        tracker.update(indexed);
+        indexNode = tracker.acquireIndexNode("/oak:index/fooIndex");
+
+        //Now result would be latest from async + last local
+        td = doSearch(indexNode, "bar");
+        assertEquals(5, td.totalHits);
+
+        //Now there would be to NRTIndex - previous and current
+        //so add to current and query again
+        addDoc(queue, "/a/e", "bar");
+        td = doSearch(indexNode, "bar");
+        assertEquals(6, td.totalHits);
+
+        //Now do another async update
+        indexed = doAsyncIndex(indexed, "a3", "bar");
+
+        indexNode.release();
+        tracker.update(indexed);
+        indexNode = tracker.acquireIndexNode("/oak:index/fooIndex");
+
+        //Now total count would be 4
+        //3 from async and 1 from current
+        td = doSearch(indexNode, "bar");
+        assertEquals(4, td.totalHits);
+    }
+
+    private NodeState doAsyncIndex(NodeState current, String childName, String fooValue) throws CommitFailedException {
+        //Have some stuff to be indexed
+        NodeBuilder builder = current.builder();
+        builder.child(childName).setProperty("foo", fooValue);
+        NodeState after = builder.getNodeState();
+        return asyncHook.processCommit(current, after, newCommitInfo());
+    }
+
+    private TopDocs doSearch(IndexNode indexNode, String fooValue) throws IOException {
+        return indexNode.getSearcher().search(new TermQuery(new Term("foo", fooValue)), 10);
+    }
+
+    private void addDoc(DocumentQueue queue, String docPath, String fooValue) {
+        Document d1 = new Document();
+        d1.add(newPathField(docPath));
+        d1.add(new StringField("foo", fooValue, Field.Store.NO));
+        queue.add(LuceneDoc.forUpdate("/oak:index/fooIndex", docPath, d1));
+    }
+
+    private IndexTracker createTracker() throws IOException {
+        IndexCopier indexCopier = new IndexCopier(sameThreadExecutor(), temporaryFolder.getRoot());
+        indexFactory = new NRTIndexFactory(indexCopier);
+        return new IndexTracker(
+                new DefaultIndexReaderFactory(defaultMountInfoProvider(), indexCopier),
+                indexFactory
+        );
+    }
+
+    private NodeState createAndPopulateAsyncIndex() throws CommitFailedException {
+        createIndexDefinition("fooIndex");
+
+        //Have some stuff to be indexed
+        builder.child("a").setProperty("foo", "bar");
+        NodeState after = builder.getNodeState();
+        return asyncHook.processCommit(EMPTY_NODE, after, newCommitInfo());
+    }
+
+    private CommitInfo newCommitInfo(){
+        info = new CommitInfo("admin", "s1",
+                ImmutableMap.<String, Object>of(CommitContext.NAME, new SimpleCommitContext()));
+        return info;
+    }
+
+    private void createIndexDefinition(String idxName) {
+        NodeBuilder idx = newLucenePropertyIndexDefinition(builder.child("oak:index"),
+                idxName, ImmutableSet.of("foo"), "async");
+        idx.setProperty(createProperty(IndexConstants.ASYNC_PROPERTY_NAME, of("sync" , "async"), STRINGS));
+    }
+
+}
\ No newline at end of file

Propchange: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.api.ContentRepository;
+import org.apache.jackrabbit.oak.api.Tree;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
+import org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReaderFactory;
+import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent;
+import org.apache.jackrabbit.oak.query.AbstractQueryTest;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
+import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
+import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.collect.ImmutableList.of;
+import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static org.apache.jackrabbit.oak.api.Type.STRINGS;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LucenePropertyIndexTest.createIndex;
+import static org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty;
+import static org.apache.jackrabbit.oak.spi.mount.Mounts.defaultMountInfoProvider;
+import static org.junit.Assert.assertNotNull;
+
+public class HybridIndexTest extends AbstractQueryTest {
+    private ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));
+    private NodeStore nodeStore;
+    private DocumentQueue queue;
+    private Clock clock = new Clock.Virtual();
+    private Whiteboard wb;
+
+    //TODO [hybrid] this needs to be obtained from NRTIndexFactory
+    private long refreshDelta = TimeUnit.SECONDS.toMillis(1);
+
+    @Override
+    protected ContentRepository createRepository() {
+        IndexCopier copier = null;
+        try {
+            copier = new IndexCopier(executorService, temporaryFolder.getRoot());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        MountInfoProvider mip = defaultMountInfoProvider();
+        LuceneIndexEditorProvider editorProvider = new LuceneIndexEditorProvider(copier,
+                null,
+                null,
+                mip);
+
+        NRTIndexFactory nrtIndexFactory = new NRTIndexFactory(copier);
+        LuceneIndexReaderFactory indexReaderFactory = new DefaultIndexReaderFactory(mip, copier);
+        IndexTracker tracker = new IndexTracker(indexReaderFactory,nrtIndexFactory);
+        LuceneIndexProvider provider = new LuceneIndexProvider(tracker);
+
+        queue = new DocumentQueue(100, tracker, clock, sameThreadExecutor());
+        LocalIndexObserver localIndexObserver = new LocalIndexObserver(queue);
+
+        nodeStore = new MemoryNodeStore();
+        Oak oak = new Oak(nodeStore)
+                .with(new InitialContent())
+                .with(new OpenSecurityProvider())
+                .with((QueryIndexProvider) provider)
+                .with((Observer) provider)
+                .with(localIndexObserver)
+                .with(editorProvider)
+                .with(new PropertyIndexEditorProvider())
+                .with(new NodeTypeIndexProvider())
+                .with(new NodeCounterEditorProvider())
+                //Effectively disable async indexing auto run
+                //such that we can control run timing as per test requirement
+                .withAsyncIndexing("async", TimeUnit.DAYS.toSeconds(1));
+
+        wb = oak.getWhiteboard();
+        return oak.createContentRepository();
+    }
+
+    @Test
+    public void hybridIndex() throws Exception{
+        String idxName = "hybridtest";
+        Tree idx = createIndex(root.getTree("/"), idxName, Collections.singleton("foo"));
+        idx.setProperty(createProperty(IndexConstants.ASYNC_PROPERTY_NAME, ImmutableSet.of("sync" , "async"), STRINGS));
+        root.commit();
+        //Run base reindex so reindex flag gets reset to false
+        runAsyncIndex();
+
+        //Get initial indexing done as local indexing only work
+        //for incremental indexing
+        createPath("/a").setProperty("foo", "bar");
+        root.commit();
+
+        //TODO This is required as LuceneIndexEditorContext has side effect of creating a facet
+        //config node
+        runAsyncIndex();
+
+        setTraversalEnabled(false);
+        assertQuery("select [jcr:path] from [nt:base] where [foo] = 'bar'", of("/a"));
+
+        //Add new node. This would not be reflected in result as local index would not be updated
+        createPath("/b").setProperty("foo", "bar");
+        root.commit();
+        assertQuery("select [jcr:path] from [nt:base] where [foo] = 'bar'", of("/a"));
+
+        //Now let some time elapse such that readers can be refreshed
+        clock.waitUntil(clock.getTime() + refreshDelta + 1);
+
+        //TODO This extra push would not be required once refresh also account for time
+        createPath("/c").setProperty("foo", "bar");
+        root.commit();
+
+        //Now recently added stuff should be visible without async indexing run
+        assertQuery("select [jcr:path] from [nt:base] where [foo] = 'bar'", of("/a", "/b", "/c"));
+
+        //Post async index it should still be upto date
+        runAsyncIndex();
+        assertQuery("select [jcr:path] from [nt:base] where [foo] = 'bar'", of("/a", "/b", "/c"));
+    }
+
+    private void runAsyncIndex() {
+        Runnable async = WhiteboardUtils.getService(wb, Runnable.class, new Predicate<Runnable>() {
+            @Override
+            public boolean apply(@Nullable Runnable input) {
+                return input instanceof AsyncIndexUpdate;
+            }
+        });
+        assertNotNull(async);
+        async.run();
+        root.refresh();
+    }
+
+    private Tree createPath(String path){
+        Tree base = root.getTree("/");
+        for (String e : PathUtils.elements(path)){
+            base = base.addChild(e);
+        }
+        return base;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.jackrabbit.oak.core.SimpleCommitContext;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertEquals;
+
+public class LocalIndexObserverTest {
+    static final Executor NOOP_EXECUTOR = new Executor() {
+        @Override
+        public void execute(Runnable command) {
+
+        }
+    };
+
+    private IndexTracker tracker = new IndexTracker();
+    private DocumentQueue collectingQueue;
+    private LocalIndexObserver observer;
+
+    @Before
+    public void setUp(){
+        collectingQueue = new DocumentQueue(10, tracker, Clock.SIMPLE, NOOP_EXECUTOR);
+        observer = new LocalIndexObserver(collectingQueue);
+    }
+
+    @Test
+    public void nullCommitInfo() throws Exception{
+        observer.contentChanged(EMPTY_NODE, null);
+    }
+
+    @Test
+    public void noCommitContext() throws Exception{
+        observer.contentChanged(EMPTY_NODE, CommitInfo.EMPTY);
+    }
+
+    @Test
+    public void noDocHolder() throws Exception{
+        observer.contentChanged(EMPTY_NODE, newCommitInfo());
+    }
+
+    @Test
+    public void docsAddedToQueue() throws Exception{
+        CommitInfo info = newCommitInfo();
+        CommitContext cc = (CommitContext) info.getInfo().get(CommitContext.NAME);
+
+        LuceneDocumentHolder holder = new LuceneDocumentHolder();
+        holder.getAsyncIndexedDocList("foo").add(LuceneDoc.forDelete("foo", "bar"));
+
+        cc.set(LuceneDocumentHolder.NAME, holder);
+
+        observer.contentChanged(EMPTY_NODE, info);
+
+        assertEquals(1, collectingQueue.getQueuedDocs().size());
+    }
+
+    private CommitInfo newCommitInfo(){
+        return new CommitInfo("admin", "s1",
+                ImmutableMap.<String, Object>of(CommitContext.NAME, new SimpleCommitContext()));
+    }
+
+
+}
\ No newline at end of file

Propchange: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native