You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2017/01/09 05:35:17 UTC
svn commit: r1777929 - in /jackrabbit/oak/trunk:
oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/
oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/
oak-lucene/src/test/java/org/apache/jackrabbit/oak...
Author: chetanm
Date: Mon Jan 9 05:35:17 2017
New Revision: 1777929
URL: http://svn.apache.org/viewvc?rev=1777929&view=rev
Log:
OAK-5421 - Add LuceneDoc directly to queue from LuceneIndexEditor
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/HybridIndexTest.java
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java Mon Jan 9 05:35:17 2017
@@ -25,6 +25,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.IndexingContext;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.IndexingQueue;
import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LocalIndexWriterFactory;
import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LuceneDocumentHolder;
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
@@ -60,6 +61,7 @@ public class LuceneIndexEditorProvider i
private final IndexTracker indexTracker;
private final MountInfoProvider mountInfoProvider;
private GarbageCollectableBlobStore blobStore;
+ private IndexingQueue indexingQueue;
/**
* Number of indexed Lucene document that can be held in memory
@@ -170,6 +172,10 @@ public class LuceneIndexEditorProvider i
return indexCopier;
}
+ IndexingQueue getIndexingQueue() {
+ return indexingQueue;
+ }
+
ExtractedTextCache getExtractedTextCache() {
return extractedTextCache;
}
@@ -181,7 +187,7 @@ public class LuceneIndexEditorProvider i
private LuceneDocumentHolder getDocumentHolder(CommitContext commitContext){
LuceneDocumentHolder holder = (LuceneDocumentHolder) commitContext.get(LuceneDocumentHolder.NAME);
if (holder == null) {
- holder = new LuceneDocumentHolder(inMemoryDocsLimit);
+ holder = new LuceneDocumentHolder(indexingQueue, inMemoryDocsLimit);
commitContext.set(LuceneDocumentHolder.NAME, holder);
}
return holder;
@@ -191,6 +197,10 @@ public class LuceneIndexEditorProvider i
this.blobStore = blobStore;
}
+ public void setIndexingQueue(IndexingQueue indexingQueue) {
+ this.indexingQueue = indexingQueue;
+ }
+
GarbageCollectableBlobStore getBlobStore() {
return blobStore;
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java Mon Jan 9 05:35:17 2017
@@ -389,6 +389,10 @@ public class LuceneIndexProviderService
}
editorProvider.setBlobStore(blobStore);
+ if (hybridIndex){
+ editorProvider.setIndexingQueue(checkNotNull(documentQueue));
+ }
+
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put("type", "lucene");
regs.add(bundleContext.registerService(IndexEditorProvider.class.getName(), editorProvider, props));
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java Mon Jan 9 05:35:17 2017
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkState;
-public class DocumentQueue implements Closeable{
+public class DocumentQueue implements Closeable, IndexingQueue {
private static final LuceneDoc STOP = LuceneDoc.forUpdate("", "", Collections.<IndexableField>emptyList());
private final Logger log = LoggerFactory.getLogger(getClass());
private final IndexTracker tracker;
@@ -113,7 +113,7 @@ public class DocumentQueue implements Cl
addAllSynchronously(docsPerIndex.asMap());
- currentTask.onComplete(completionHandler);
+ scheduleQueuedDocsProcessing();
} catch (Throwable t) {
exceptionHandler.uncaughtException(Thread.currentThread(), t);
}
@@ -142,6 +142,17 @@ public class DocumentQueue implements Cl
this.dropped = sp.getMeter("HYBRID_DROPPED", StatsOptions.DEFAULT);
}
+ @Override
+ public boolean addIfNotFullWithoutWait(LuceneDoc doc){
+ checkState(!stopped);
+ boolean added = docsQueue.offer(doc);
+ if (added) {
+ queueSizeStats.inc();
+ }
+ return added;
+ }
+
+ @Override
public boolean add(LuceneDoc doc){
checkState(!stopped);
boolean added = false;
@@ -150,10 +161,8 @@ public class DocumentQueue implements Cl
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- // Set the completion handler on the currently running task. Multiple calls
- // to onComplete are not a problem here since we always pass the same value.
- // Thus there is no question as to which of the handlers will effectively run.
- currentTask.onComplete(completionHandler);
+ scheduleQueuedDocsProcessing();
+
if (added) {
queueSizeStats.inc();
} else {
@@ -162,6 +171,15 @@ public class DocumentQueue implements Cl
return added;
}
+ @Override
+ public void scheduleQueuedDocsProcessing() {
+ // Set the completion handler on the currently running task. Multiple calls
+ // to onComplete are not a problem here since we always pass the same value.
+ // Thus there is no question as to which of the handlers will effectively run.
+ currentTask.onComplete(completionHandler);
+ }
+
+ @Override
public void addAllSynchronously(Map<String, Collection<LuceneDoc>> docsPerIndex) {
//If required it can optimized by indexing diff indexes in parallel
//Something to consider if it becomes a bottleneck
@@ -192,6 +210,7 @@ public class DocumentQueue implements Cl
try{
LuceneIndexWriter writer = indexNode.getLocalWriter();
+ boolean docAdded = false;
for (LuceneDoc doc : docs) {
if (writer == null) {
//IndexDefinition per IndexNode might have changed and local
@@ -200,14 +219,23 @@ public class DocumentQueue implements Cl
"entry for [{}]", indexPath, doc.docPath);
return;
}
+ if (doc.isProcessed()){
+ //Skip already processed doc entry
+ continue;
+ } else {
+ doc.markProcessed();
+ }
if (doc.delete) {
writer.deleteDocuments(doc.docPath);
} else {
writer.updateDocument(doc.docPath, doc.doc);
}
+ docAdded = true;
log.trace("Updated index with doc {}", doc);
}
- indexNode.refreshReadersOnWriteIfRequired();
+ if (docAdded) {
+ indexNode.refreshReadersOnWriteIfRequired();
+ }
} catch (Exception e) {
//For now we just log it. Later we need to see if frequent error then to
//temporarily disable indexing for this index
Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java?rev=1777929&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java Mon Jan 9 05:35:17 2017
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.util.Collection;
+import java.util.Map;
+
+public interface IndexingQueue {
+
+ /**
+ * Adds the given doc to a queue without any wait
+ *
+ * @param doc to be added
+ * @return true if the doc was added to the queue
+ */
+ boolean addIfNotFullWithoutWait(LuceneDoc doc);
+
+ /**
+ * Adds the given doc to a queue with possible wait if queue is full.
+ * The wait would be having an upper limit
+ *
+ * @param doc LuceneDoc to be added
+ * @return true if the doc was added to the queue
+ */
+ boolean add(LuceneDoc doc);
+
+ /**
+ * The docs are added directly to the index without any queuing
+ *
+ * @param docsPerIndex map of LuceneDoc per index path
+ */
+ void addAllSynchronously(Map<String, Collection<LuceneDoc>> docsPerIndex);
+
+ /**
+ * Schedules the async processing of queued entries
+ */
+ void scheduleQueuedDocsProcessing();
+
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexingQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java Mon Jan 9 05:35:17 2017
@@ -23,11 +23,12 @@ import javax.annotation.Nullable;
import org.apache.lucene.index.IndexableField;
-class LuceneDoc {
+class LuceneDoc implements LuceneDocInfo {
final String indexPath;
final String docPath;
final Iterable<? extends IndexableField> doc;
final boolean delete;
+ private volatile boolean processed;
public static LuceneDoc forUpdate(String indexPath, String path, Iterable<? extends IndexableField> doc){
return new LuceneDoc(indexPath, path, doc, false);
@@ -48,4 +49,24 @@ class LuceneDoc {
public String toString() {
return String.format("%s(%s)", indexPath, docPath);
}
+
+ public boolean isProcessed() {
+ return processed;
+ }
+
+ public void markProcessed(){
+ processed = true;
+ }
+
+ //~-------------------------------< LuceneDocInfo >
+
+ @Override
+ public String getIndexPath() {
+ return indexPath;
+ }
+
+ @Override
+ public String getDocPath() {
+ return docPath;
+ }
}
Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java?rev=1777929&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java Mon Jan 9 05:35:17 2017
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+/**
+ * Defines just the info part of a LuceneDoc
+ */
+interface LuceneDocInfo {
+
+ String getIndexPath();
+
+ String getDocPath();
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocInfo.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java Mon Jan 9 05:35:17 2017
@@ -20,28 +20,33 @@
package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
+import com.google.common.base.Function;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public class LuceneDocumentHolder {
private static final Logger log = LoggerFactory.getLogger(LuceneDocumentHolder.class);
public static final String NAME = "oak.lucene.documentHolder";
private final ListMultimap<String, LuceneDoc> nrtIndexedList = ArrayListMultimap.create();
private final ListMultimap<String, LuceneDoc> syncIndexedList = ArrayListMultimap.create();
+ private final ListMultimap<String, String> queuedNrtIndexedPath = ArrayListMultimap.create();
+ private final ListMultimap<String, LuceneDoc> queuedSyncIndexedPath = ArrayListMultimap.create();
private final int inMemoryDocsLimit;
+ private final IndexingQueue documentQueue;
private boolean limitWarningLogged;
+ private boolean docAddedToQueue;
+ private boolean schedulingDone;
- public LuceneDocumentHolder(){
- this(500);
- }
-
- public LuceneDocumentHolder(int inMemoryDocsLimit) {
+ public LuceneDocumentHolder(IndexingQueue documentQueue, int inMemoryDocsLimit) {
+ this.documentQueue = checkNotNull(documentQueue);
this.inMemoryDocsLimit = inMemoryDocsLimit;
}
@@ -50,29 +55,50 @@ public class LuceneDocumentHolder {
}
public Map<String, Collection<LuceneDoc>> getSyncIndexedDocs(){
+ for (Map.Entry<String, LuceneDoc> e : queuedSyncIndexedPath.entries()){
+ if (!e.getValue().isProcessed()){
+ syncIndexedList.put(e.getKey(), e.getValue());
+ }
+ }
return syncIndexedList.asMap();
}
public void add(boolean sync, LuceneDoc doc) {
- if (sync){
- getSyncIndexedDocList(doc.indexPath).add(doc);
+ //First try adding to queue in non blocking manner
+ if (documentQueue.addIfNotFullWithoutWait(doc)){
+ if (sync){
+ queuedSyncIndexedPath.put(doc.indexPath, doc);
+ } else {
+ queuedNrtIndexedPath.put(doc.indexPath, doc.docPath);
+ }
+ docAddedToQueue = true;
} else {
- if (queueSizeWithinLimits()) {
- getNRTIndexedDocList(doc.indexPath).add(doc);
+ //Queue is full so keep it in memory
+ if (sync) {
+ syncIndexedList.put(doc.indexPath, doc);
+ } else {
+ if (queueSizeWithinLimits()) {
+ nrtIndexedList.put(doc.indexPath, doc);
+ }
}
}
}
public void done(String indexPath) {
-
- }
-
- List<LuceneDoc> getNRTIndexedDocList(String indexPath) {
- return nrtIndexedList.get(indexPath);
+ //Hints the queue to process the queued docs in batch
+ if (docAddedToQueue && !schedulingDone) {
+ documentQueue.scheduleQueuedDocsProcessing();
+ schedulingDone = true;
+ }
}
- List<LuceneDoc> getSyncIndexedDocList(String indexPath) {
- return syncIndexedList.get(indexPath);
+ /**
+ * Returns an iterable for all indexed paths handled by this holder instance. The paths
+ * may be directly forwarded to the queue or held in memory for later processing
+ */
+ Iterable<? extends LuceneDocInfo> getAllLuceneDocInfo(){
+ return Iterables.concat(nrtIndexedList.values(), syncIndexedList.values(),
+ asLuceneDocInfo(queuedNrtIndexedPath), queuedSyncIndexedPath.values());
}
private boolean queueSizeWithinLimits(){
@@ -86,4 +112,23 @@ public class LuceneDocumentHolder {
}
return true;
}
+
+ private static Iterable<? extends LuceneDocInfo> asLuceneDocInfo(ListMultimap<String, String> docs) {
+ return Iterables.transform(docs.entries(), new Function<Map.Entry<String, String>, LuceneDocInfo>() {
+ @Override
+ public LuceneDocInfo apply(final Map.Entry<String, String> input) {
+ return new LuceneDocInfo() {
+ @Override
+ public String getIndexPath() {
+ return input.getKey();
+ }
+
+ @Override
+ public String getDocPath() {
+ return input.getValue();
+ }
+ };
+ }
+ });
+ }
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java Mon Jan 9 05:35:17 2017
@@ -28,6 +28,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.IndexingContext;
import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.IndexingMode;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.DocumentQueue;
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Editor;
@@ -36,10 +37,8 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.junit.Test;
-import static com.google.common.collect.ImmutableSet.of;
import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
import static org.apache.jackrabbit.oak.plugins.index.lucene.util.LuceneIndexHelper.newLucenePropertyIndexDefinition;
-import static org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty;
import static org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -58,6 +57,7 @@ public class LuceneIndexEditorProviderTe
null,
null,
Mounts.defaultMountInfoProvider());
+ editorProvider.setIndexingQueue(mock(DocumentQueue.class));
IndexUpdateCallback callback = new TestCallback("/oak:index/fooIndex", newCommitInfo(), false, false);
NodeBuilder defnBuilder = createIndexDefinition("fooIndex").builder();
@@ -83,7 +83,7 @@ public class LuceneIndexEditorProviderTe
null,
null,
Mounts.defaultMountInfoProvider());
-
+ editorProvider.setIndexingQueue(mock(DocumentQueue.class));
//Set up a different IndexDefinition which needs to be returned
//from tracker with a marker property
NodeBuilder testBuilder = createIndexDefinition("fooIndex").builder();
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java Mon Jan 9 05:35:17 2017
@@ -99,6 +99,7 @@ public class LuceneIndexProviderServiceT
LuceneIndexEditorProvider editorProvider =
(LuceneIndexEditorProvider) context.getService(IndexEditorProvider.class);
assertNotNull(editorProvider.getIndexCopier());
+ assertNotNull(editorProvider.getIndexingQueue());
IndexCopier indexCopier = service.getIndexCopier();
assertNotNull("IndexCopier should be initialized as CopyOnRead is enabled by default", indexCopier);
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java Mon Jan 9 05:35:17 2017
@@ -41,6 +41,7 @@ import org.apache.jackrabbit.oak.api.Con
import org.apache.jackrabbit.oak.api.Tree;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditorProvider;
@@ -73,6 +74,7 @@ import org.apache.jackrabbit.oak.spi.whi
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -98,6 +100,11 @@ public class HybridIndexTest extends Abs
private long refreshDelta = TimeUnit.SECONDS.toMillis(1);
+ @After
+ public void tearDown(){
+ new ExecutorCloser(executorService).close();
+ }
+
@Override
protected ContentRepository createRepository() {
IndexCopier copier;
@@ -112,14 +119,14 @@ public class HybridIndexTest extends Abs
LuceneIndexReaderFactory indexReaderFactory = new DefaultIndexReaderFactory(mip, copier);
IndexTracker tracker = new IndexTracker(indexReaderFactory,nrtIndexFactory);
LuceneIndexProvider provider = new LuceneIndexProvider(tracker);
-
+ queue = new DocumentQueue(100, tracker, sameThreadExecutor());
LuceneIndexEditorProvider editorProvider = new LuceneIndexEditorProvider(copier,
tracker,
null,
null,
mip);
+ editorProvider.setIndexingQueue(queue);
- queue = new DocumentQueue(100, tracker, sameThreadExecutor());
LocalIndexObserver localIndexObserver = new LocalIndexObserver(queue, StatisticsProvider.NOOP);
nodeStore = new MemoryNodeStore();
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java Mon Jan 9 05:35:17 2017
@@ -72,8 +72,8 @@ public class LocalIndexObserverTest {
CommitInfo info = newCommitInfo();
CommitContext cc = (CommitContext) info.getInfo().get(CommitContext.NAME);
- LuceneDocumentHolder holder = new LuceneDocumentHolder();
- holder.getNRTIndexedDocList("foo").add(LuceneDoc.forDelete("foo", "bar"));
+ LuceneDocumentHolder holder = new LuceneDocumentHolder(collectingQueue, 500);
+ holder.add(false, LuceneDoc.forDelete("foo", "bar"));
cc.set(LuceneDocumentHolder.NAME, holder);
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java Mon Jan 9 05:35:17 2017
@@ -20,11 +20,14 @@
package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
import java.io.IOException;
+import java.util.List;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.core.SimpleCommitContext;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.IndexingMode;
import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.lucene.TestUtil;
@@ -39,6 +42,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
import static org.junit.Assert.*;
@@ -51,16 +55,19 @@ public class LocalIndexWriterFactoryTest
private EditorHook asyncHook;
private CommitInfo info;
private LuceneIndexEditorProvider editorProvider;
+ private IndexTracker tracker;
@Before
public void setUp() throws IOException {
+ tracker = new IndexTracker();
+ DocumentQueue queue = new DocumentQueue(100, tracker, sameThreadExecutor());
editorProvider = new LuceneIndexEditorProvider(
null,
null,
null,
Mounts.defaultMountInfoProvider()
);
-
+ editorProvider.setIndexingQueue(queue);
syncHook = new EditorHook(new IndexUpdateProvider(editorProvider));
asyncHook = new EditorHook(new IndexUpdateProvider(editorProvider, "async", false));
}
@@ -97,7 +104,7 @@ public class LocalIndexWriterFactoryTest
assertNotNull(holder);
//2 add none for delete
- assertEquals(2, holder.getNRTIndexedDocList("/oak:index/fooIndex").size());
+ assertEquals(2, getIndexedDocList(holder, "/oak:index/fooIndex").size());
}
@Test
@@ -114,10 +121,10 @@ public class LocalIndexWriterFactoryTest
assertNotNull(holder);
//1 add - bar
- assertEquals(1, holder.getNRTIndexedDocList("/oak:index/fooIndex").size());
+ assertEquals(1, getIndexedDocList(holder, "/oak:index/fooIndex").size());
//1 add - bar
- assertEquals(1, holder.getNRTIndexedDocList("/oak:index/barIndex").size());
+ assertEquals(1, getIndexedDocList(holder, "/oak:index/barIndex").size());
}
@@ -134,14 +141,14 @@ public class LocalIndexWriterFactoryTest
assertNotNull(holder);
//2 add none for delete
- assertEquals(2, holder.getSyncIndexedDocList("/oak:index/fooIndex").size());
- assertEquals(0, holder.getNRTIndexedDocList("/oak:index/fooIndex").size());
+ assertEquals(2, getIndexedDocList(holder,"/oak:index/fooIndex").size());
}
@Test
public void inMemoryDocLimit() throws Exception{
NodeState indexed = createAndPopulateAsyncIndex(IndexingMode.NRT);
editorProvider.setInMemoryDocsLimit(5);
+ editorProvider.setIndexingQueue(new DocumentQueue(1, tracker, sameThreadExecutor()));
builder = indexed.builder();
for (int i = 0; i < 10; i++) {
builder.child("b" + i).setProperty("foo", "bar");
@@ -150,7 +157,8 @@ public class LocalIndexWriterFactoryTest
syncHook.processCommit(indexed, after, newCommitInfo());
LuceneDocumentHolder holder = getHolder();
- assertEquals(5, holder.getNRTIndexedDocList("/oak:index/fooIndex").size());
+ //5 for in memory list and 1 in queue
+ assertEquals(5 + 1, getIndexedDocList(holder, "/oak:index/fooIndex").size());
}
private NodeState createAndPopulateAsyncIndex(IndexingMode indexingMode) throws CommitFailedException {
@@ -195,4 +203,13 @@ public class LocalIndexWriterFactoryTest
builder.child("oak:index").setChildNode(idxName, idx.build());
}
+ private static List<String> getIndexedDocList(LuceneDocumentHolder holder, String indexPath){
+ List<String> paths = Lists.newArrayList();
+ for (LuceneDocInfo doc : holder.getAllLuceneDocInfo()){
+ if (doc.getIndexPath().equals(indexPath)){
+ paths.add(doc.getDocPath());
+ }
+ }
+ return paths;
+ }
}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java?rev=1777929&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java Mon Jan 9 05:35:17 2017
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Maps;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItems;
+import static org.junit.Assert.assertThat;
+
+public class LuceneDocumentHolderTest {
+ private DummyQueue queue = new DummyQueue();
+ private LuceneDocumentHolder holder = new LuceneDocumentHolder(queue, 100);
+
+ @Test
+ public void testAllLuceneDocReturned() throws Exception{
+ queue.enabled = false;
+ holder.add(true, LuceneDoc.forDelete("/oak:index/foo", "/a"));
+ holder.add(false, LuceneDoc.forDelete("/oak:index/bar", "/b"));
+
+ queue.enabled = true;
+ holder.add(true, LuceneDoc.forDelete("/oak:index/foo", "/c"));
+ holder.add(false, LuceneDoc.forDelete("/oak:index/bar", "/d"));
+
+ assertThat(asMultiMap(holder.getAllLuceneDocInfo()).values(), hasItems("/a", "/b", "/c", "/d"));
+ }
+
+ @Test
+ public void unprocessedSyncQueuedDocs() throws Exception{
+ queue.enabled = true;
+ holder.add(true, LuceneDoc.forDelete("/oak:index/foo", "/a"));
+ holder.add(true, LuceneDoc.forDelete("/oak:index/foo", "/c"));
+
+ queue.enabled = false;
+ holder.add(true, LuceneDoc.forDelete("/oak:index/foo", "/b"));
+
+ queue.luceneDocs.get("/c").markProcessed();
+
+ assertThat(asMultiMap(holder.getSyncIndexedDocs()).values(), containsInAnyOrder("/a", "/b"));
+ }
+
+ private static ListMultimap<String, String> asMultiMap(Iterable<? extends LuceneDocInfo> itr){
+ ListMultimap<String, String> docs = ArrayListMultimap.create();
+ for (LuceneDocInfo d : itr){
+ docs.put(d.getIndexPath(), d.getDocPath());
+ }
+ return docs;
+ }
+
+ private static ListMultimap<String, String> asMultiMap(Map<String, Collection<LuceneDoc>> map){
+ ListMultimap<String, String> docs = ArrayListMultimap.create();
+ for (Collection<LuceneDoc> lds : map.values()){
+ for (LuceneDoc d : lds){
+ docs.put(d.getIndexPath(), d.getDocPath());
+ }
+ }
+ return docs;
+ }
+
+ private static class DummyQueue implements IndexingQueue {
+ boolean enabled;
+ ListMultimap<String, String> docs = ArrayListMultimap.create();
+ Map<String, LuceneDoc> luceneDocs = Maps.newHashMap();
+
+ @Override
+ public boolean addIfNotFullWithoutWait(LuceneDoc doc) {
+ if (enabled){
+ docs.put(doc.indexPath, doc.docPath);
+ luceneDocs.put(doc.docPath, doc);
+ }
+ return enabled;
+ }
+
+ @Override
+ public boolean add(LuceneDoc doc) {
+ throw new AbstractMethodError();
+ }
+
+ @Override
+ public void addAllSynchronously(Map<String, Collection<LuceneDoc>> docsPerIndex) {
+ throw new AbstractMethodError();
+ }
+
+ @Override
+ public void scheduleQueuedDocsProcessing() {
+
+ }
+ }
+}
\ No newline at end of file
Propchange: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/HybridIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/HybridIndexTest.java?rev=1777929&r1=1777928&r2=1777929&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/HybridIndexTest.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/HybridIndexTest.java Mon Jan 9 05:35:17 2017
@@ -338,6 +338,7 @@ public class HybridIndexTest extends Abs
queue = new DocumentQueue(queueSize, tracker, executorService, statsProvider);
localIndexObserver = new LocalIndexObserver(queue, statsProvider);
+ luceneEditorProvider.setIndexingQueue(queue);
}
private void runAsyncIndex() {