You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2017/01/09 05:35:17 UTC

svn commit: r1777929 - in /jackrabbit/oak/trunk: oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ oak-lucene/src/test/java/org/apache/jackrabbit/oak...

Author: chetanm
Date: Mon Jan  9 05:35:17 2017
New Revision: 1777929

URL: http://svn.apache.org/viewvc?rev=1777929&view=rev
Log:
OAK-5421 - Add LuceneDoc directly to queue from LuceneIndexEditor

Added:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java   (with props)
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/HybridIndexTest.java

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java Mon Jan  9 05:35:17 2017
@@ -25,6 +25,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
 import org.apache.jackrabbit.oak.plugins.index.IndexingContext;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.IndexingQueue;
 import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LocalIndexWriterFactory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LuceneDocumentHolder;
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
@@ -60,6 +61,7 @@ public class LuceneIndexEditorProvider i
     private final IndexTracker indexTracker;
     private final MountInfoProvider mountInfoProvider;
     private GarbageCollectableBlobStore blobStore;
+    private IndexingQueue indexingQueue;
 
     /**
      * Number of indexed Lucene document that can be held in memory
@@ -170,6 +172,10 @@ public class LuceneIndexEditorProvider i
         return indexCopier;
     }
 
+    IndexingQueue getIndexingQueue() {
+        return indexingQueue;
+    }
+
     ExtractedTextCache getExtractedTextCache() {
         return extractedTextCache;
     }
@@ -181,7 +187,7 @@ public class LuceneIndexEditorProvider i
     private LuceneDocumentHolder getDocumentHolder(CommitContext commitContext){
         LuceneDocumentHolder holder = (LuceneDocumentHolder) commitContext.get(LuceneDocumentHolder.NAME);
         if (holder == null) {
-            holder = new LuceneDocumentHolder(inMemoryDocsLimit);
+            holder = new LuceneDocumentHolder(indexingQueue, inMemoryDocsLimit);
             commitContext.set(LuceneDocumentHolder.NAME, holder);
         }
         return holder;
@@ -191,6 +197,10 @@ public class LuceneIndexEditorProvider i
         this.blobStore = blobStore;
     }
 
+    public void setIndexingQueue(IndexingQueue indexingQueue) {
+        this.indexingQueue = indexingQueue;
+    }
+
     GarbageCollectableBlobStore getBlobStore() {
         return blobStore;
     }

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java Mon Jan  9 05:35:17 2017
@@ -389,6 +389,10 @@ public class LuceneIndexProviderService
         }
         editorProvider.setBlobStore(blobStore);
 
+        if (hybridIndex){
+            editorProvider.setIndexingQueue(checkNotNull(documentQueue));
+        }
+
         Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put("type", "lucene");
         regs.add(bundleContext.registerService(IndexEditorProvider.class.getName(), editorProvider, props));

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java Mon Jan  9 05:35:17 2017
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkState;
 
-public class DocumentQueue implements Closeable{
+public class DocumentQueue implements Closeable, IndexingQueue {
     private static final LuceneDoc STOP = LuceneDoc.forUpdate("", "", Collections.<IndexableField>emptyList());
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final IndexTracker tracker;
@@ -113,7 +113,7 @@ public class DocumentQueue implements Cl
 
                     addAllSynchronously(docsPerIndex.asMap());
 
-                    currentTask.onComplete(completionHandler);
+                    scheduleQueuedDocsProcessing();
                 } catch (Throwable t) {
                     exceptionHandler.uncaughtException(Thread.currentThread(), t);
                 }
@@ -142,6 +142,17 @@ public class DocumentQueue implements Cl
         this.dropped = sp.getMeter("HYBRID_DROPPED", StatsOptions.DEFAULT);
     }
 
+    @Override
+    public boolean addIfNotFullWithoutWait(LuceneDoc doc){
+        checkState(!stopped);
+        boolean added = docsQueue.offer(doc);
+        if (added) {
+            queueSizeStats.inc();
+        }
+        return added;
+    }
+
+    @Override
     public boolean add(LuceneDoc doc){
         checkState(!stopped);
         boolean added = false;
@@ -150,10 +161,8 @@ public class DocumentQueue implements Cl
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         }
-        // Set the completion handler on the currently running task. Multiple calls
-        // to onComplete are not a problem here since we always pass the same value.
-        // Thus there is no question as to which of the handlers will effectively run.
-        currentTask.onComplete(completionHandler);
+        scheduleQueuedDocsProcessing();
+
         if (added) {
             queueSizeStats.inc();
         } else {
@@ -162,6 +171,15 @@ public class DocumentQueue implements Cl
         return added;
     }
 
+    @Override
+    public void scheduleQueuedDocsProcessing() {
+        // Set the completion handler on the currently running task. Multiple calls
+        // to onComplete are not a problem here since we always pass the same value.
+        // Thus there is no question as to which of the handlers will effectively run.
+        currentTask.onComplete(completionHandler);
+    }
+
+    @Override
     public void addAllSynchronously(Map<String, Collection<LuceneDoc>> docsPerIndex) {
         //If required it can optimized by indexing diff indexes in parallel
         //Something to consider if it becomes a bottleneck
@@ -192,6 +210,7 @@ public class DocumentQueue implements Cl
 
         try{
             LuceneIndexWriter writer = indexNode.getLocalWriter();
+            boolean docAdded = false;
             for (LuceneDoc doc : docs) {
                 if (writer == null) {
                     //IndexDefinition per IndexNode might have changed and local
@@ -200,14 +219,23 @@ public class DocumentQueue implements Cl
                             "entry for [{}]", indexPath, doc.docPath);
                     return;
                 }
+                if (doc.isProcessed()){
+                    //Skip already processed doc entry
+                    continue;
+                } else {
+                    doc.markProcessed();
+                }
                 if (doc.delete) {
                     writer.deleteDocuments(doc.docPath);
                 } else {
                     writer.updateDocument(doc.docPath, doc.doc);
                 }
+                docAdded = true;
                 log.trace("Updated index with doc {}", doc);
             }
-            indexNode.refreshReadersOnWriteIfRequired();
+            if (docAdded) {
+                indexNode.refreshReadersOnWriteIfRequired();
+            }
         } catch (Exception e) {
             //For now we just log it. Later we need to see if frequent error then to
             //temporarily disable indexing for this index

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java?rev=1777929&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java Mon Jan  9 05:35:17 2017
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.util.Collection;
+import java.util.Map;
+
+public interface IndexingQueue {
+
+    /**
+     * Adds the given doc to a queue without any wait
+     *
+     * @param doc to be added
+     * @return true if the doc was added to the queue
+     */
+    boolean addIfNotFullWithoutWait(LuceneDoc doc);
+
+    /**
+     * Adds the given doc to a queue with possible wait if queue is full.
+     * The wait would be having an upper limit
+     *
+     * @param doc LuceneDoc to be added
+     * @return true if the doc was added to the queue
+     */
+    boolean add(LuceneDoc doc);
+
+    /**
+     * The docs are added directly to the index without any queuing
+     *
+     * @param docsPerIndex map of LuceneDoc per index path
+     */
+    void addAllSynchronously(Map<String, Collection<LuceneDoc>> docsPerIndex);
+
+    /**
+     * Schedules the async processing of queued entries
+     */
+    void scheduleQueuedDocsProcessing();
+
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java Mon Jan  9 05:35:17 2017
@@ -23,11 +23,12 @@ import javax.annotation.Nullable;
 
 import org.apache.lucene.index.IndexableField;
 
-class LuceneDoc {
+class LuceneDoc implements LuceneDocInfo {
     final String indexPath;
     final String docPath;
     final Iterable<? extends IndexableField> doc;
     final boolean delete;
+    private volatile boolean processed;
 
     public static LuceneDoc forUpdate(String indexPath, String path, Iterable<? extends IndexableField> doc){
         return new LuceneDoc(indexPath, path, doc, false);
@@ -48,4 +49,24 @@ class LuceneDoc {
     public String toString() {
         return String.format("%s(%s)", indexPath, docPath);
     }
+
+    public boolean isProcessed() {
+        return processed;
+    }
+
+    public void markProcessed(){
+        processed = true;
+    }
+
+    //~-------------------------------< LuceneDocInfo >
+
+    @Override
+    public String getIndexPath() {
+        return indexPath;
+    }
+
+    @Override
+    public String getDocPath() {
+        return docPath;
+    }
 }

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java?rev=1777929&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java Mon Jan  9 05:35:17 2017
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+/**
+ * Defines just the info part of a LuceneDoc
+ */
+interface LuceneDocInfo {
+
+    String getIndexPath();
+
+    String getDocPath();
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java Mon Jan  9 05:35:17 2017
@@ -20,28 +20,33 @@
 package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.ListMultimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 public class LuceneDocumentHolder {
     private static final Logger log = LoggerFactory.getLogger(LuceneDocumentHolder.class);
     public static final String NAME = "oak.lucene.documentHolder";
 
     private final ListMultimap<String, LuceneDoc> nrtIndexedList = ArrayListMultimap.create();
     private final ListMultimap<String, LuceneDoc> syncIndexedList = ArrayListMultimap.create();
+    private final ListMultimap<String, String> queuedNrtIndexedPath = ArrayListMultimap.create();
+    private final ListMultimap<String, LuceneDoc> queuedSyncIndexedPath = ArrayListMultimap.create();
     private final int inMemoryDocsLimit;
+    private final IndexingQueue documentQueue;
     private boolean limitWarningLogged;
+    private boolean docAddedToQueue;
+    private boolean schedulingDone;
 
-    public LuceneDocumentHolder(){
-        this(500);
-    }
-
-    public LuceneDocumentHolder(int inMemoryDocsLimit) {
+    public LuceneDocumentHolder(IndexingQueue documentQueue, int inMemoryDocsLimit) {
+        this.documentQueue = checkNotNull(documentQueue);
         this.inMemoryDocsLimit = inMemoryDocsLimit;
     }
 
@@ -50,29 +55,50 @@ public class LuceneDocumentHolder {
     }
 
     public Map<String, Collection<LuceneDoc>> getSyncIndexedDocs(){
+        for (Map.Entry<String, LuceneDoc> e : queuedSyncIndexedPath.entries()){
+            if (!e.getValue().isProcessed()){
+                syncIndexedList.put(e.getKey(), e.getValue());
+            }
+        }
         return syncIndexedList.asMap();
     }
 
     public void add(boolean sync, LuceneDoc doc) {
-        if (sync){
-            getSyncIndexedDocList(doc.indexPath).add(doc);
+        //First try adding to queue in non blocking manner
+        if (documentQueue.addIfNotFullWithoutWait(doc)){
+            if (sync){
+                queuedSyncIndexedPath.put(doc.indexPath, doc);
+            } else {
+                queuedNrtIndexedPath.put(doc.indexPath, doc.docPath);
+            }
+            docAddedToQueue = true;
         } else {
-            if (queueSizeWithinLimits()) {
-                getNRTIndexedDocList(doc.indexPath).add(doc);
+            //Queue is full so keep it in memory
+            if (sync) {
+                syncIndexedList.put(doc.indexPath, doc);
+            } else {
+                if (queueSizeWithinLimits()) {
+                    nrtIndexedList.put(doc.indexPath, doc);
+                }
             }
         }
     }
 
     public void done(String indexPath) {
-
-    }
-
-    List<LuceneDoc> getNRTIndexedDocList(String indexPath) {
-        return nrtIndexedList.get(indexPath);
+        //Hints the queue to process the queued docs in batch
+        if (docAddedToQueue && !schedulingDone) {
+            documentQueue.scheduleQueuedDocsProcessing();
+            schedulingDone = true;
+        }
     }
 
-    List<LuceneDoc> getSyncIndexedDocList(String indexPath) {
-        return syncIndexedList.get(indexPath);
+    /**
+     * Returns an iterable for all indexed paths handled by this holder instance. The paths
+     * may be directly forwarded to the queue or held in memory for later processing
+     */
+    Iterable<? extends LuceneDocInfo> getAllLuceneDocInfo(){
+        return Iterables.concat(nrtIndexedList.values(), syncIndexedList.values(),
+                asLuceneDocInfo(queuedNrtIndexedPath), queuedSyncIndexedPath.values());
     }
 
     private boolean queueSizeWithinLimits(){
@@ -86,4 +112,23 @@ public class LuceneDocumentHolder {
         }
         return true;
     }
+
+    private static Iterable<? extends LuceneDocInfo> asLuceneDocInfo(ListMultimap<String, String> docs) {
+        return Iterables.transform(docs.entries(), new Function<Map.Entry<String, String>, LuceneDocInfo>() {
+            @Override
+            public LuceneDocInfo apply(final Map.Entry<String, String> input) {
+                return new LuceneDocInfo() {
+                    @Override
+                    public String getIndexPath() {
+                        return input.getKey();
+                    }
+
+                    @Override
+                    public String getDocPath() {
+                        return input.getValue();
+                    }
+                };
+            }
+        });
+    }
 }

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java Mon Jan  9 05:35:17 2017
@@ -28,6 +28,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
 import org.apache.jackrabbit.oak.plugins.index.IndexingContext;
 import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.IndexingMode;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.DocumentQueue;
 import org.apache.jackrabbit.oak.spi.commit.CommitContext;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.Editor;
@@ -36,10 +37,8 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.junit.Test;
 
-import static com.google.common.collect.ImmutableSet.of;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.util.LuceneIndexHelper.newLucenePropertyIndexDefinition;
-import static org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty;
 import static org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -58,6 +57,7 @@ public class LuceneIndexEditorProviderTe
                 null,
                 null,
                 Mounts.defaultMountInfoProvider());
+        editorProvider.setIndexingQueue(mock(DocumentQueue.class));
 
         IndexUpdateCallback callback = new TestCallback("/oak:index/fooIndex", newCommitInfo(), false, false);
         NodeBuilder defnBuilder = createIndexDefinition("fooIndex").builder();
@@ -83,7 +83,7 @@ public class LuceneIndexEditorProviderTe
                 null,
                 null,
                 Mounts.defaultMountInfoProvider());
-
+        editorProvider.setIndexingQueue(mock(DocumentQueue.class));
         //Set up a different IndexDefinition which needs to be returned
         //from tracker with a marker property
         NodeBuilder testBuilder = createIndexDefinition("fooIndex").builder();

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java Mon Jan  9 05:35:17 2017
@@ -99,6 +99,7 @@ public class LuceneIndexProviderServiceT
         LuceneIndexEditorProvider editorProvider =
                 (LuceneIndexEditorProvider) context.getService(IndexEditorProvider.class);
         assertNotNull(editorProvider.getIndexCopier());
+        assertNotNull(editorProvider.getIndexingQueue());
 
         IndexCopier indexCopier = service.getIndexCopier();
         assertNotNull("IndexCopier should be initialized as CopyOnRead is enabled by default", indexCopier);

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java Mon Jan  9 05:35:17 2017
@@ -41,6 +41,7 @@ import org.apache.jackrabbit.oak.api.Con
 import org.apache.jackrabbit.oak.api.Tree;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditorProvider;
@@ -73,6 +74,7 @@ import org.apache.jackrabbit.oak.spi.whi
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -98,6 +100,11 @@ public class HybridIndexTest extends Abs
 
     private long refreshDelta = TimeUnit.SECONDS.toMillis(1);
 
+    @After
+    public void tearDown(){
+        new ExecutorCloser(executorService).close();
+    }
+
     @Override
     protected ContentRepository createRepository() {
         IndexCopier copier;
@@ -112,14 +119,14 @@ public class HybridIndexTest extends Abs
         LuceneIndexReaderFactory indexReaderFactory = new DefaultIndexReaderFactory(mip, copier);
         IndexTracker tracker = new IndexTracker(indexReaderFactory,nrtIndexFactory);
         LuceneIndexProvider provider = new LuceneIndexProvider(tracker);
-
+        queue = new DocumentQueue(100, tracker, sameThreadExecutor());
         LuceneIndexEditorProvider editorProvider = new LuceneIndexEditorProvider(copier,
                 tracker,
                 null,
                 null,
                 mip);
+        editorProvider.setIndexingQueue(queue);
 
-        queue = new DocumentQueue(100, tracker, sameThreadExecutor());
         LocalIndexObserver localIndexObserver = new LocalIndexObserver(queue, StatisticsProvider.NOOP);
 
         nodeStore = new MemoryNodeStore();

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java Mon Jan  9 05:35:17 2017
@@ -72,8 +72,8 @@ public class LocalIndexObserverTest {
         CommitInfo info = newCommitInfo();
         CommitContext cc = (CommitContext) info.getInfo().get(CommitContext.NAME);
 
-        LuceneDocumentHolder holder = new LuceneDocumentHolder();
-        holder.getNRTIndexedDocList("foo").add(LuceneDoc.forDelete("foo", "bar"));
+        LuceneDocumentHolder holder = new LuceneDocumentHolder(collectingQueue, 500);
+        holder.add(false, LuceneDoc.forDelete("foo", "bar"));
 
         cc.set(LuceneDocumentHolder.NAME, holder);
 

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java Mon Jan  9 05:35:17 2017
@@ -20,11 +20,14 @@
 package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
 
 import java.io.IOException;
+import java.util.List;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.core.SimpleCommitContext;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
 import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.IndexingMode;
 import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.lucene.TestUtil;
@@ -39,6 +42,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
 import static org.junit.Assert.*;
@@ -51,16 +55,19 @@ public class LocalIndexWriterFactoryTest
     private EditorHook asyncHook;
     private CommitInfo info;
     private LuceneIndexEditorProvider editorProvider;
+    private IndexTracker tracker;
 
     @Before
     public void setUp() throws IOException {
+        tracker = new IndexTracker();
+        DocumentQueue queue = new DocumentQueue(100, tracker, sameThreadExecutor());
         editorProvider = new LuceneIndexEditorProvider(
                 null,
                 null,
                 null,
                 Mounts.defaultMountInfoProvider()
         );
-
+        editorProvider.setIndexingQueue(queue);
         syncHook = new EditorHook(new IndexUpdateProvider(editorProvider));
         asyncHook = new EditorHook(new IndexUpdateProvider(editorProvider, "async", false));
     }
@@ -97,7 +104,7 @@ public class LocalIndexWriterFactoryTest
         assertNotNull(holder);
 
         //2 add none for delete
-        assertEquals(2, holder.getNRTIndexedDocList("/oak:index/fooIndex").size());
+        assertEquals(2, getIndexedDocList(holder, "/oak:index/fooIndex").size());
     }
 
     @Test
@@ -114,10 +121,10 @@ public class LocalIndexWriterFactoryTest
         assertNotNull(holder);
 
         //1 add  - bar
-        assertEquals(1, holder.getNRTIndexedDocList("/oak:index/fooIndex").size());
+        assertEquals(1, getIndexedDocList(holder, "/oak:index/fooIndex").size());
 
         //1 add  - bar
-        assertEquals(1, holder.getNRTIndexedDocList("/oak:index/barIndex").size());
+        assertEquals(1, getIndexedDocList(holder, "/oak:index/barIndex").size());
 
     }
 
@@ -134,14 +141,14 @@ public class LocalIndexWriterFactoryTest
         assertNotNull(holder);
 
         //2 add none for delete
-        assertEquals(2, holder.getSyncIndexedDocList("/oak:index/fooIndex").size());
-        assertEquals(0, holder.getNRTIndexedDocList("/oak:index/fooIndex").size());
+        assertEquals(2, getIndexedDocList(holder,"/oak:index/fooIndex").size());
     }
 
     @Test
     public void inMemoryDocLimit() throws Exception{
         NodeState indexed = createAndPopulateAsyncIndex(IndexingMode.NRT);
         editorProvider.setInMemoryDocsLimit(5);
+        editorProvider.setIndexingQueue(new DocumentQueue(1, tracker, sameThreadExecutor()));
         builder = indexed.builder();
         for (int i = 0; i < 10; i++) {
             builder.child("b" + i).setProperty("foo", "bar");
@@ -150,7 +157,8 @@ public class LocalIndexWriterFactoryTest
         syncHook.processCommit(indexed, after, newCommitInfo());
 
         LuceneDocumentHolder holder = getHolder();
-        assertEquals(5, holder.getNRTIndexedDocList("/oak:index/fooIndex").size());
+        //5 for in memory list and 1 in queue
+        assertEquals(5 + 1, getIndexedDocList(holder, "/oak:index/fooIndex").size());
     }
 
     private NodeState createAndPopulateAsyncIndex(IndexingMode indexingMode) throws CommitFailedException {
@@ -195,4 +203,13 @@ public class LocalIndexWriterFactoryTest
         builder.child("oak:index").setChildNode(idxName, idx.build());
     }
 
+    private static List<String> getIndexedDocList(LuceneDocumentHolder holder, String indexPath){
+        List<String> paths = Lists.newArrayList();
+        for (LuceneDocInfo doc : holder.getAllLuceneDocInfo()){
+            if (doc.getIndexPath().equals(indexPath)){
+                paths.add(doc.getDocPath());
+            }
+        }
+        return paths;
+    }
 }
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java?rev=1777929&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java Mon Jan  9 05:35:17 2017
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Maps;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItems;
+import static org.junit.Assert.assertThat;
+
+public class LuceneDocumentHolderTest {
+    private DummyQueue queue = new DummyQueue();
+    private LuceneDocumentHolder holder = new LuceneDocumentHolder(queue, 100);
+
+    @Test
+    public void testAllLuceneDocReturned() throws Exception{
+        queue.enabled = false;
+        holder.add(true, LuceneDoc.forDelete("/oak:index/foo", "/a"));
+        holder.add(false, LuceneDoc.forDelete("/oak:index/bar", "/b"));
+
+        queue.enabled = true;
+        holder.add(true, LuceneDoc.forDelete("/oak:index/foo", "/c"));
+        holder.add(false, LuceneDoc.forDelete("/oak:index/bar", "/d"));
+
+        assertThat(asMultiMap(holder.getAllLuceneDocInfo()).values(), hasItems("/a", "/b", "/c", "/d"));
+    }
+
+    @Test
+    public void unprocessedSyncQueuedDocs() throws Exception{
+        queue.enabled = true;
+        holder.add(true, LuceneDoc.forDelete("/oak:index/foo", "/a"));
+        holder.add(true, LuceneDoc.forDelete("/oak:index/foo", "/c"));
+
+        queue.enabled = false;
+        holder.add(true, LuceneDoc.forDelete("/oak:index/foo", "/b"));
+
+        queue.luceneDocs.get("/c").markProcessed();
+
+        assertThat(asMultiMap(holder.getSyncIndexedDocs()).values(), containsInAnyOrder("/a", "/b"));
+    }
+
+    private static ListMultimap<String, String> asMultiMap(Iterable<? extends LuceneDocInfo> itr){
+        ListMultimap<String, String> docs = ArrayListMultimap.create();
+        for (LuceneDocInfo d : itr){
+            docs.put(d.getIndexPath(), d.getDocPath());
+        }
+        return docs;
+    }
+
+    private static ListMultimap<String, String> asMultiMap(Map<String, Collection<LuceneDoc>> map){
+        ListMultimap<String, String> docs = ArrayListMultimap.create();
+        for (Collection<LuceneDoc> lds : map.values()){
+            for (LuceneDoc d : lds){
+                docs.put(d.getIndexPath(), d.getDocPath());
+            }
+        }
+        return docs;
+    }
+
+    private static class DummyQueue implements IndexingQueue {
+        boolean enabled;
+        ListMultimap<String, String> docs = ArrayListMultimap.create();
+        Map<String, LuceneDoc> luceneDocs = Maps.newHashMap();
+
+        @Override
+        public boolean addIfNotFullWithoutWait(LuceneDoc doc) {
+            if (enabled){
+                docs.put(doc.indexPath, doc.docPath);
+                luceneDocs.put(doc.docPath, doc);
+            }
+            return enabled;
+        }
+
+        @Override
+        public boolean add(LuceneDoc doc) {
+            throw new AbstractMethodError();
+        }
+
+        @Override
+        public void addAllSynchronously(Map<String, Collection<LuceneDoc>> docsPerIndex) {
+            throw new AbstractMethodError();
+        }
+
+        @Override
+        public void scheduleQueuedDocsProcessing() {
+
+        }
+    }
+}
\ No newline at end of file

Propchange: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/HybridIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/HybridIndexTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/HybridIndexTest.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/HybridIndexTest.java Mon Jan  9 05:35:17 2017
@@ -338,6 +338,7 @@ public class HybridIndexTest extends Abs
 
         queue = new DocumentQueue(queueSize, tracker, executorService, statsProvider);
         localIndexObserver = new LocalIndexObserver(queue, statsProvider);
+        luceneEditorProvider.setIndexingQueue(queue);
     }
 
     private void runAsyncIndex() {