You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2016/09/15 07:14:06 UTC
svn commit: r1760831 [1/2] - in /jackrabbit/oak/trunk/oak-lucene/src:
main/java/org/apache/jackrabbit/oak/plugins/index/lucene/
main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/
test/java/org/apache/jackrabbit/oak/plugins/index/lucene/ t...
Author: chetanm
Date: Thu Sep 15 07:14:06 2016
New Revision: 1760831
URL: http://svn.apache.org/viewvc?rev=1760831&view=rev
Log:
OAK-4412 - Lucene hybrid index
Base implementation for the hybrid index. Main parts working with some todo and OSGi integration pending
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactoryTest.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactoryTest.java (with props)
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinitionTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java Thu Sep 15 07:14:06 2016
@@ -231,6 +231,8 @@ public final class IndexDefinition imple
private final String indexPath;
+ private final boolean sync;
+
@Nullable
private final String uid;
@@ -303,6 +305,7 @@ public final class IndexDefinition imple
this.secureFacets = defn.hasChildNode(FACETS) && getOptionalValue(defn.getChildNode(FACETS), PROP_SECURE_FACETS, true);
this.suggestEnabled = evaluateSuggestionEnabled();
this.spellcheckEnabled = evaluateSpellcheckEnabled();
+ this.sync = determineSync(defn);
}
public NodeState getDefinitionNodeState() {
@@ -433,6 +436,10 @@ public final class IndexDefinition imple
return uid;
}
+ public boolean isSync() {
+ return sync;
+ }
+
@Override
public String toString() {
return "Lucene Index : " + indexName;
@@ -1565,4 +1572,10 @@ public final class IndexDefinition imple
return version == IndexFormatVersion.V1 ? 1.5 : 1.0;
}
+ private static boolean determineSync(NodeState defn) {
+ Iterable<String> async = defn.getStrings(IndexConstants.ASYNC_PROPERTY_NAME);
+ //TODO [hybrid] make it a constant
+ return Iterables.contains(async, "sync");
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java Thu Sep 15 07:14:06 2016
@@ -20,15 +20,23 @@ import static com.google.common.base.Pre
import static com.google.common.base.Preconditions.checkState;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.google.common.base.Preconditions;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Iterables;
import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.NRTIndex;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.NRTIndexFactory;
import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReader;
import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReaderFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
@@ -36,18 +44,26 @@ import org.apache.lucene.search.IndexSea
import org.apache.lucene.search.suggest.analyzing.AnalyzingInfixSuggester;
import org.apache.lucene.store.Directory;
-class IndexNode {
+public class IndexNode {
- static IndexNode open(String indexPath, NodeState root, NodeState defnNodeState, LuceneIndexReaderFactory readerFactory)
+ static IndexNode open(String indexPath, NodeState root, NodeState defnNodeState,
+ LuceneIndexReaderFactory readerFactory, @Nullable NRTIndexFactory nrtFactory)
throws IOException {
IndexDefinition definition = new IndexDefinition(root, defnNodeState);
List<LuceneIndexReader> readers = readerFactory.createReaders(definition, defnNodeState, indexPath);
+ NRTIndex nrtIndex = nrtFactory != null ? nrtFactory.createIndex(definition) : null;
if (!readers.isEmpty()){
- return new IndexNode(PathUtils.getName(indexPath), definition, readers);
+ return new IndexNode(PathUtils.getName(indexPath), definition, readers, nrtIndex);
}
return null;
}
+ /**
+ * Time interval after which readers would be refreshed in case of real time index
+ * TODO Make this configurable
+ */
+ private final long refreshDelta = TimeUnit.SECONDS.toMillis(1);
+
private final List<LuceneIndexReader> readers;
private final String name;
@@ -56,17 +72,22 @@ class IndexNode {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final IndexSearcher indexSearcher;
+ private volatile IndexSearcher indexSearcher;
+
+ private final NRTIndex nrtIndex;
private boolean closed = false;
- IndexNode(String name, IndexDefinition definition, List<LuceneIndexReader> readers)
+ private long lastRefreshTime;
+
+ IndexNode(String name, IndexDefinition definition, List<LuceneIndexReader> readers, @Nullable NRTIndex nrtIndex)
throws IOException {
checkArgument(!readers.isEmpty());
this.name = name;
this.definition = definition;
this.readers = readers;
- this.indexSearcher = new IndexSearcher(createReader(readers));
+ this.nrtIndex = nrtIndex;
+ this.indexSearcher = new IndexSearcher(createReader());
}
String getName() {
@@ -77,7 +98,7 @@ class IndexNode {
return definition;
}
- IndexSearcher getSearcher() {
+ public IndexSearcher getSearcher() {
return indexSearcher;
}
@@ -99,7 +120,7 @@ class IndexNode {
}
}
- void release() {
+ public void release() {
lock.readLock().unlock();
}
@@ -112,9 +133,32 @@ class IndexNode {
lock.writeLock().unlock();
}
- for (LuceneIndexReader reader : readers){
+ //Do not close the NRTIndex here as it might be in use
+ //by newer IndexNode. Just close the readers obtained from
+ //them
+ for (LuceneIndexReader reader : Iterables.concat(readers, getNRTReaders())){
reader.close();
- }
+ }
+ }
+
+ @CheckForNull
+ public LuceneIndexWriter getLocalWriter() throws IOException{
+ return nrtIndex != null ? nrtIndex.getWriter() : null;
+ }
+
+ public void refreshReaders(long currentTime) {
+ //TODO [hybrid] Refreshing currently requires updates to happen
+ //However if no update happened after last update the refresh would not
+ //happen and result would remain stale upto next async cycle. Possibly
+ //introduce a refresh policy
+ if (currentTime - lastRefreshTime > refreshDelta){
+ lastRefreshTime = currentTime;
+ indexSearcher = new IndexSearcher(createReader());
+ }
+ }
+
+ public long getRefreshDelta() {
+ return refreshDelta;
}
private LuceneIndexReader getDefaultReader(){
@@ -122,14 +166,22 @@ class IndexNode {
return readers.get(0);
}
- private IndexReader createReader(List<LuceneIndexReader> readers) {
- if (readers.size() == 1){
+ private IndexReader createReader() {
+ List<LuceneIndexReader> nrtReaders = getNRTReaders();
+ if (readers.size() == 1 && nrtReaders.isEmpty()){
return readers.get(0).getReader();
}
- IndexReader[] readerArr = new IndexReader[readers.size()];
- for (int i = 0; i < readerArr.length; i++) {
- readerArr[i] = readers.get(i).getReader();
+ IndexReader[] readerArr = new IndexReader[readers.size() + nrtReaders.size()];
+ int i = 0;
+ for (LuceneIndexReader r : Iterables.concat(readers, nrtReaders)){
+ readerArr[i++] = r.getReader();
}
return new MultiReader(readerArr, true);
}
+
+ private List<LuceneIndexReader> getNRTReaders() {
+ return nrtIndex != null ? nrtIndex.getReaders() : Collections.<LuceneIndexReader>emptyList();
+ }
+
+
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java Thu Sep 15 07:14:06 2016
@@ -34,7 +34,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
+
import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.NRTIndexFactory;
import org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory;
import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReaderFactory;
import org.apache.jackrabbit.oak.spi.commit.CompositeEditor;
@@ -51,7 +54,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-class IndexTracker {
+public class IndexTracker {
/** Logger instance. */
private static final Logger log = LoggerFactory.getLogger(IndexTracker.class);
@@ -59,6 +62,7 @@ class IndexTracker {
new PerfLogger(LoggerFactory.getLogger(IndexTracker.class.getName() + ".perf"));
private final LuceneIndexReaderFactory readerFactory;
+ private final NRTIndexFactory nrtFactory;
private NodeState root = EMPTY_NODE;
@@ -66,7 +70,7 @@ class IndexTracker {
private volatile boolean refresh;
- IndexTracker() {
+ public IndexTracker() {
this((IndexCopier)null);
}
@@ -74,8 +78,13 @@ class IndexTracker {
this(new DefaultIndexReaderFactory(Mounts.defaultMountInfoProvider(), cloner));
}
- IndexTracker(LuceneIndexReaderFactory readerFactory){
+ IndexTracker(LuceneIndexReaderFactory readerFactory) {
+ this(readerFactory, null);
+ }
+
+ public IndexTracker(LuceneIndexReaderFactory readerFactory, @Nullable NRTIndexFactory nrtFactory){
this.readerFactory = readerFactory;
+ this.nrtFactory = nrtFactory;
}
synchronized void close() {
@@ -91,7 +100,7 @@ class IndexTracker {
}
}
- synchronized void update(final NodeState root) {
+ public synchronized void update(final NodeState root) {
if (refresh) {
this.root = root;
close();
@@ -115,7 +124,7 @@ class IndexTracker {
public void leave(NodeState before, NodeState after) {
try {
long start = PERF_LOGGER.start();
- IndexNode index = IndexNode.open(path, root, after, readerFactory);
+ IndexNode index = IndexNode.open(path, root, after, readerFactory, nrtFactory);
PERF_LOGGER.end(start, -1, "[{}] Index found to be updated. Reopening the IndexNode", path);
updates.put(path, index); // index can be null
} catch (IOException e) {
@@ -153,7 +162,7 @@ class IndexTracker {
refresh = true;
}
- IndexNode acquireIndexNode(String path) {
+ public IndexNode acquireIndexNode(String path) {
IndexNode index = indices.get(path);
if (index != null && index.acquire()) {
return index;
@@ -183,7 +192,7 @@ class IndexTracker {
try {
if (isLuceneIndexNode(node)) {
- index = IndexNode.open(path, root, node, readerFactory);
+ index = IndexNode.open(path, root, node, readerFactory, nrtFactory);
if (index != null) {
checkState(index.acquire());
indices = ImmutableMap.<String, IndexNode>builder()
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java Thu Sep 15 07:14:06 2016
@@ -19,10 +19,17 @@ package org.apache.jackrabbit.oak.plugin
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import com.google.common.collect.Iterables;
import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.index.ContextAwareCallback;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexEditor;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
+import org.apache.jackrabbit.oak.plugins.index.IndexingContext;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LocalIndexWriterFactory;
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterFactory;
import org.apache.jackrabbit.oak.spi.commit.Editor;
@@ -31,6 +38,7 @@ import org.apache.jackrabbit.oak.spi.mou
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
@@ -66,7 +74,7 @@ public class LuceneIndexEditorProvider i
@Nullable IndexAugmentorFactory augmentorFactory,
MountInfoProvider mountInfoProvider) {
this.indexCopier = indexCopier;
- this.extractedTextCache = checkNotNull(extractedTextCache);
+ this.extractedTextCache = extractedTextCache != null ? extractedTextCache : new ExtractedTextCache(0, 0);
this.augmentorFactory = augmentorFactory;
this.indexWriterFactory = new DefaultIndexWriterFactory(checkNotNull(mountInfoProvider), indexCopier);
}
@@ -77,8 +85,25 @@ public class LuceneIndexEditorProvider i
@Nonnull IndexUpdateCallback callback)
throws CommitFailedException {
if (TYPE_LUCENE.equals(type)) {
+ checkArgument(callback instanceof ContextAwareCallback, "callback instance not of type " +
+ "ContextAwareCallback [%s]", callback);
+ IndexingContext indexingContext = ((ContextAwareCallback)callback).getIndexingContext();
+ LuceneIndexWriterFactory writerFactory = indexWriterFactory;
+ if (!indexingContext.isAsync() && supportsSyncIndexing(definition)) {
+
+ //Would not participate in reindexing. Only interested in
+ //incremental indexing
+ if (indexingContext.isReindexing()){
+ return null;
+ }
+ //TODO [hybrid] switch the builder to readonly one
+ //TODO [hybrid] Make use of existing IndexDefinition to avoid reinit for
+ //every commit
+ writerFactory = new LocalIndexWriterFactory(indexingContext);
+ }
+
LuceneIndexEditorContext context = new LuceneIndexEditorContext(root, definition, callback,
- indexWriterFactory, extractedTextCache, augmentorFactory);
+ writerFactory, extractedTextCache, augmentorFactory);
return new LuceneIndexEditor(context);
}
return null;
@@ -91,4 +116,13 @@ public class LuceneIndexEditorProvider i
ExtractedTextCache getExtractedTextCache() {
return extractedTextCache;
}
+
+ private boolean supportsSyncIndexing(NodeBuilder defn){
+ //TODO [hybrid] Similar logic exists in IndexDefinition. Should be unified
+ PropertyState async = defn.getProperty(IndexConstants.ASYNC_PROPERTY_NAME);
+ if (async == null){
+ return false;
+ }
+ return Iterables.contains(async.getValue(Type.STRINGS), "sync");
+ }
}
Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexNode;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.apache.lucene.index.IndexableField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkState;
+
+class DocumentQueue implements Closeable{
+ private static final LuceneDoc STOP = LuceneDoc.forUpdate("", "", Collections.<IndexableField>emptyList());
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final IndexTracker tracker;
+ private final BlockingQueue<LuceneDoc> docsQueue;
+ private final Clock clock;
+ private final Executor executor;
+ private volatile boolean stopped;
+
+ /**
+ * Handler for uncaught exception on the background thread
+ */
+ private final UncaughtExceptionHandler exceptionHandler = new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ log.error("Uncaught exception", e);
+ }
+ };
+
+ /**
+ * Current background task
+ */
+ private volatile NotifyingFutureTask currentTask = NotifyingFutureTask.completed();
+
+ /**
+ * Completion handler: set the current task to the next task and schedules that one
+ * on the background thread.
+ */
+ private final Runnable completionHandler = new Runnable() {
+ private final Callable<Void> task = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ LuceneDoc doc = docsQueue.poll();
+ if (doc != null && doc != STOP) {
+ processDoc(doc);
+ currentTask.onComplete(completionHandler);
+ }
+ } catch (Throwable t) {
+ exceptionHandler.uncaughtException(Thread.currentThread(), t);
+ }
+ return null;
+ }
+ };
+
+ @Override
+ public void run() {
+ currentTask = new NotifyingFutureTask(task);
+ executor.execute(currentTask);
+ }
+ };
+
+ DocumentQueue(int maxQueueSize, IndexTracker tracker, Clock clock, Executor executor) {
+ this.docsQueue = new LinkedBlockingDeque<>(maxQueueSize);
+ this.tracker = tracker;
+ this.clock = clock;
+ this.executor = executor;
+ }
+
+ public boolean add(LuceneDoc doc){
+ checkState(!stopped);
+ boolean added = docsQueue.offer(doc);
+ // Set the completion handler on the currently running task. Multiple calls
+ // to onComplete are not a problem here since we always pass the same value.
+ // Thus there is no question as to which of the handlers will effectively run.
+ currentTask.onComplete(completionHandler);
+ //TODO log warning when queue is full
+ return added;
+ }
+
+ List<LuceneDoc> getQueuedDocs(){
+ List<LuceneDoc> docs = Lists.newArrayList();
+ docs.addAll(docsQueue);
+ return docs;
+ }
+
+ private void processDoc(LuceneDoc doc){
+ IndexNode indexNode = tracker.acquireIndexNode(doc.indexPath);
+ if (indexNode == null) {
+ log.debug("No IndexNode found for index [{}]. Skipping index entry for [{}]", doc.indexPath, doc.docPath);
+ return;
+ }
+
+ try{
+ LuceneIndexWriter writer = indexNode.getLocalWriter();
+
+ if (writer == null){
+ //IndexDefinition per IndexNode might have changed and local
+ //indexing is disabled. Ignore
+ log.debug("No local IndexWriter found for index [{}]. Skipping index " +
+ "entry for [{}]", doc.indexPath, doc.docPath);
+ return;
+ }
+ if (doc.delete) {
+ writer.deleteDocuments(doc.docPath);
+ } else {
+ writer.updateDocument(doc.docPath, doc.doc);
+ }
+ //TODO Support for immediate refresh
+ indexNode.refreshReaders(clock.getTime());
+ } catch (Exception e) {
+ //For now we just log it. Later we need to see if frequent error then to
+ //temporarily disable indexing for this index
+ log.warn("Error occurred while indexing node [{}] for index [{}]",doc.docPath, doc.indexPath, e);
+ } finally {
+ indexNode.release();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ //Its fine to "drop" any entry in queue as
+ //local index is meant for running state only
+ docsQueue.clear();
+ docsQueue.add(STOP);
+ stopped = true;
+
+ //TODO Should we wait for STOP to be processed
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+public class LocalIndexObserver implements Observer{
+ private final DocumentQueue docQueue;
+
+ public LocalIndexObserver(DocumentQueue docQueue) {
+ this.docQueue = docQueue;
+ }
+
+ @Override
+ public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+ //TODO [hybrid] Do external diff?
+ if (info == null){
+ return;
+ }
+
+ CommitContext commitContext = (CommitContext) info.getInfo().get(CommitContext.NAME);
+ //Commit done internally i.e. one not using Root/Tree API
+ if (commitContext == null){
+ return;
+ }
+
+ LuceneDocumentHolder holder = (LuceneDocumentHolder) commitContext.get(LuceneDocumentHolder.NAME);
+ //Nothing to be indexed
+ if (holder == null){
+ return;
+ }
+
+ for (LuceneDoc doc : holder.getAsyncIndexedDocs()){
+ docQueue.add(doc);
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import org.apache.jackrabbit.oak.plugins.index.IndexingContext;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterFactory;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.index.IndexableField;
+
+public class LocalIndexWriterFactory implements LuceneIndexWriterFactory {
+ public static final String COMMIT_PROCESSED_BY_LOCAL_LUCENE_EDITOR = "commitProcessedByLocalLuceneEditor";
+ private final IndexingContext indexingContext;
+ private final CommitContext commitContext;
+
+ public LocalIndexWriterFactory(IndexingContext indexingContext) {
+ this.indexingContext = indexingContext;
+ this.commitContext = getCommitContext(indexingContext);
+ }
+
+ private LuceneDocumentHolder getDocumentHolder(){
+ LuceneDocumentHolder holder = (LuceneDocumentHolder) commitContext.get(LuceneDocumentHolder.NAME);
+ if (holder == null) {
+ //lazily initialize the holder
+ holder = new LuceneDocumentHolder();
+ commitContext.set(LuceneDocumentHolder.NAME, holder);
+ }
+ return holder;
+ }
+
+ private static CommitContext getCommitContext(IndexingContext indexingContext) {
+ CommitContext commitContext = (CommitContext) indexingContext.getCommitInfo().getInfo().get(CommitContext.NAME);
+ return Preconditions.checkNotNull(commitContext, "No commit context found in commit info");
+ }
+
+ @Override
+ public LuceneIndexWriter newInstance(IndexDefinition definition, NodeBuilder definitionBuilder, boolean reindex) {
+ return new LocalIndexWriter(definition);
+ }
+
+ private class LocalIndexWriter implements LuceneIndexWriter {
+ private final IndexDefinition definition;
+ private List<LuceneDoc> docList;
+
+ public LocalIndexWriter(IndexDefinition definition) {
+ this.definition = definition;
+ }
+
+ @Override
+ public void updateDocument(String path, Iterable<? extends IndexableField> doc) throws IOException {
+ addLuceneDoc(LuceneDoc.forUpdate(definition.getIndexPathFromConfig(), path, doc));
+ }
+
+ @Override
+ public void deleteDocuments(String path) throws IOException {
+ addLuceneDoc(LuceneDoc.forDelete(definition.getIndexPathFromConfig(), path));
+ }
+
+ @Override
+ public boolean close(long timestamp) throws IOException {
+ //This is used by testcase
+ commitContext.set(COMMIT_PROCESSED_BY_LOCAL_LUCENE_EDITOR, Boolean.TRUE);
+ //always return false as nothing gets written to the index
+ return false;
+ }
+
+ private void addLuceneDoc(LuceneDoc luceneDoc) {
+ if (docList == null){
+ docList = getDocumentHolder().getAsyncIndexedDocList(indexingContext.getIndexPath());
+ }
+ //TODO [hybrid] checks about the size. If too many drop
+ //However for truly sync case hold on
+ docList.add(luceneDoc);
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import javax.annotation.Nullable;
+
+import org.apache.lucene.index.IndexableField;
+
+class LuceneDoc {
+ final String indexPath;
+ final String docPath;
+ final Iterable<? extends IndexableField> doc;
+ final boolean delete;
+
+ public static LuceneDoc forUpdate(String indexPath, String path, Iterable<? extends IndexableField> doc){
+ return new LuceneDoc(indexPath, path, doc, false);
+ }
+
+ public static LuceneDoc forDelete(String indexPath, String path){
+ return new LuceneDoc(indexPath, path, null, true);
+ }
+
+ private LuceneDoc(String indexPath, String path, @Nullable Iterable<? extends IndexableField> doc, boolean delete) {
+ this.docPath = path;
+ this.indexPath = indexPath;
+ this.doc = doc;
+ this.delete = delete;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(%s)", indexPath, docPath);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDoc.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.util.List;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+
+class LuceneDocumentHolder {
+ public static final String NAME = "oak.lucene.documentHolder";
+
+ private final ListMultimap<String, LuceneDoc> asyncList = ArrayListMultimap.create();
+
+ public List<LuceneDoc> getAsyncIndexedDocList(String indexPath) {
+ return asyncList.get(indexPath);
+ }
+
+ public Iterable<LuceneDoc> getAsyncIndexedDocs(){
+ return asyncList.values();
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LuceneDocumentHolder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReader;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.IndexWriterUtils;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.search.suggest.analyzing.AnalyzingInfixSuggester;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.NRTCachingDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkState;
+
+
+public class NRTIndex implements Closeable {
+ private static final AtomicInteger COUNTER = new AtomicInteger();
+ private static final Logger log = LoggerFactory.getLogger(NRTIndex.class);
+
+ /**
+ * Prefix used for naming the directory created for NRT indexes
+ */
+ private static final String NRT_DIR_PREFIX = "nrt-";
+
+ private final IndexDefinition definition;
+ private final IndexCopier indexCopier;
+ private final LuceneIndexReader previousReader;
+
+ private IndexWriter indexWriter;
+ private NRTIndexWriter nrtIndexWriter;
+ private File indexDir;
+ private Directory directory;
+ private boolean closed;
+
+ public NRTIndex(IndexDefinition definition, IndexCopier indexCopier, @Nullable NRTIndex previous) {
+ this.definition = definition;
+ this.indexCopier = indexCopier;
+ this.previousReader = previous != null ? previous.getPrimaryReader() : null;
+ }
+
+ @CheckForNull
+ LuceneIndexReader getPrimaryReader() {
+ return createReader();
+ }
+
+ public LuceneIndexWriter getWriter() throws IOException {
+ checkState(!closed);
+ if (nrtIndexWriter == null) {
+ nrtIndexWriter = createWriter();
+ }
+ return nrtIndexWriter;
+ }
+
+ public List<LuceneIndexReader> getReaders() {
+ checkState(!closed);
+ List<LuceneIndexReader> readers = Lists.newArrayListWithCapacity(2);
+ LuceneIndexReader latestReader = createReader();
+ if (latestReader != null) {
+ readers.add(latestReader);
+ }
+
+ //Old reader should be added later
+ if (previousReader != null) {
+ readers.add(previousReader);
+ }
+ return readers;
+ }
+
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ if (indexWriter != null) {
+ indexWriter.close();
+ directory.close();
+ FileUtils.deleteQuietly(indexDir);
+ log.debug("[{}] Removed directory [{}]", this, indexDir);
+ }
+
+ closed = true;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public String toString() {
+ return definition.getIndexPathFromConfig();
+ }
+
+ //For test
+ File getIndexDir() {
+ return indexDir;
+ }
+
+ @CheckForNull
+ private LuceneIndexReader createReader() {
+ checkState(!closed);
+ //Its possible that readers are obtained
+ //before anything gets indexed
+ if (indexWriter == null) {
+ return null;
+ }
+ try {
+ //applyDeletes is false as layers above would take care of
+ //stale result
+ return new NRTReader(DirectoryReader.open(indexWriter, false));
+ } catch (IOException e) {
+ log.warn("Error opening index [{}]", e);
+ }
+ return null;
+ }
+
+ private synchronized NRTIndexWriter createWriter() throws IOException {
+ long uniqueCount = System.currentTimeMillis() + COUNTER.incrementAndGet();
+ String dirName = NRT_DIR_PREFIX + uniqueCount;
+ indexDir = indexCopier.getIndexDir(definition, definition.getIndexPathFromConfig(), dirName);
+ Directory fsdir = FSDirectory.open(indexDir);
+ //TODO make these configurable
+ directory = new NRTCachingDirectory(fsdir, 1, 1);
+ IndexWriterConfig config = IndexWriterUtils.getIndexWriterConfig(definition, false);
+ indexWriter = new IndexWriter(directory, config);
+ return new NRTIndexWriter(indexWriter);
+ }
+
+ private static class NRTReader implements LuceneIndexReader {
+ private final IndexReader indexReader;
+
+ public NRTReader(IndexReader indexReader) {
+ this.indexReader = indexReader;
+ }
+
+ @Override
+ public IndexReader getReader() {
+ return indexReader;
+ }
+
+ @Override
+ public AnalyzingInfixSuggester getLookup() {
+ return null;
+ }
+
+ @Override
+ public Directory getSuggestDirectory() {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ }
+
+ private static class NRTIndexWriter implements LuceneIndexWriter {
+ private final IndexWriter indexWriter;
+
+ public NRTIndexWriter(IndexWriter indexWriter) {
+ this.indexWriter = indexWriter;
+ }
+
+ @Override
+ public void updateDocument(String path, Iterable<? extends IndexableField> doc) throws IOException {
+ //For NRT case documents are never updated
+ //instead they are just added. This would cause duplicates
+ //That should be taken care at query side via unique cursor
+ indexWriter.addDocument(doc);
+ }
+
+ @Override
+ public void deleteDocuments(String path) throws IOException {
+ //Do not delete documents. Query side would handle it
+ }
+
+ @Override
+ public boolean close(long timestamp) throws IOException {
+ throw new IllegalStateException("Close should not be called");
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import javax.annotation.CheckForNull;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class NRTIndexFactory implements Closeable{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final ListMultimap<String, NRTIndex> indexes = LinkedListMultimap.create();
+ private final IndexCopier indexCopier;
+
+ public NRTIndexFactory(IndexCopier indexCopier) {
+ this.indexCopier = checkNotNull(indexCopier);
+ }
+
+ //This would not be invoked concurrently
+ // but still mark it synchronized for safety
+ @CheckForNull
+ public synchronized NRTIndex createIndex(IndexDefinition definition) {
+ if (!definition.isSync()){
+ return null;
+ }
+ String indexPath = definition.getIndexPathFromConfig();
+ NRTIndex current = new NRTIndex(definition, indexCopier, getPrevious(indexPath));
+ indexes.put(indexPath, current);
+ closeLast(indexPath);
+ return current;
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (NRTIndex index : indexes.values()){
+ index.close();
+ }
+ indexes.clear();
+ }
+
+ List<NRTIndex> getIndexes(String path){
+ return indexes.get(path);
+ }
+
+ private void closeLast(String indexPath) {
+ List<NRTIndex> existing = indexes.get(indexPath);
+ if (existing.size() < 3){
+ return;
+ }
+ NRTIndex oldest = existing.remove(0);
+ try {
+ oldest.close();
+ } catch (IOException e) {
+ log.warn("Error occurred while closing index [{}]", oldest, e);
+ }
+ }
+
+ private NRTIndex getPrevious(String indexPath) {
+ List<NRTIndex> existing = indexes.get(indexPath);
+ if (existing.isEmpty()){
+ return null;
+ }
+ checkArgument(existing.size() <= 2, "Found [%s] more than 3 index", existing.size());
+ return existing.get(existing.size() - 1);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinitionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinitionTest.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinitionTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinitionTest.java Thu Sep 15 07:14:06 2016
@@ -77,6 +77,7 @@ public class IndexDefinitionTest {
public void defaultConfig() throws Exception{
IndexDefinition idxDefn = new IndexDefinition(root, builder.getNodeState());
assertTrue(idxDefn.saveDirListing());
+ assertFalse(idxDefn.isSync());
}
@Test
@@ -866,6 +867,13 @@ public class IndexDefinitionTest {
}
+ @Test
+ public void sync() throws Exception{
+ builder.setProperty(createProperty(IndexConstants.ASYNC_PROPERTY_NAME, of("sync" , "async"), STRINGS));
+ IndexDefinition idxDefn = new IndexDefinition(root, builder.getNodeState());
+ assertTrue(idxDefn.isSync());
+ }
+
//TODO indexesAllNodesOfMatchingType - with nullCheckEnabled
private static IndexingRule getRule(IndexDefinition defn, String typeName){
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java Thu Sep 15 07:14:06 2016
@@ -768,11 +768,11 @@ public class IndexPlannerTest {
//------ END - Suggestion/spellcheck plan tests
private IndexNode createIndexNode(IndexDefinition defn, long numOfDocs) throws IOException {
- return new IndexNode("foo", defn, new TestReaderFactory(createSampleDirectory(numOfDocs)).createReaders(defn, EMPTY_NODE, "foo"));
+ return new IndexNode("foo", defn, new TestReaderFactory(createSampleDirectory(numOfDocs)).createReaders(defn, EMPTY_NODE, "foo"), null);
}
private IndexNode createIndexNode(IndexDefinition defn) throws IOException {
- return new IndexNode("foo", defn, new TestReaderFactory(createSampleDirectory()).createReaders(defn, EMPTY_NODE, "foo"));
+ return new IndexNode("foo", defn, new TestReaderFactory(createSampleDirectory()).createReaders(defn, EMPTY_NODE, "foo"), null);
}
private FilterImpl createFilter(String nodeTypeName) {
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java Thu Sep 15 07:14:06 2016
@@ -2422,7 +2422,7 @@ public class LucenePropertyIndexTest ext
return createIndex(index, name, propNames);
}
- static Tree createIndex(Tree index, String name, Set<String> propNames) throws CommitFailedException {
+ public static Tree createIndex(Tree index, String name, Set<String> propNames) throws CommitFailedException {
Tree def = index.addChild(INDEX_DEFINITIONS_NAME).addChild(name);
def.setProperty(JcrConstants.JCR_PRIMARYTYPE,
INDEX_DEFINITIONS_NODE_TYPE, Type.NAME);
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java?rev=1760831&r1=1760830&r2=1760831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java Thu Sep 15 07:14:06 2016
@@ -140,7 +140,7 @@ public class MultiplexingLucenePropertyI
LuceneIndexReaderFactory readerFactory = new DefaultIndexReaderFactory(mip, null);
List<LuceneIndexReader> readers = readerFactory.createReaders(defn, builder.getNodeState(),"/foo");
- IndexNode node = new IndexNode("foo", defn, readers);
+ IndexNode node = new IndexNode("foo", defn, readers, null);
//3 Obtain the plan
FilterImpl filter = createFilter("nt:base");
Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.core.SimpleCommitContext;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexNode;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static org.apache.jackrabbit.oak.api.Type.STRINGS;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.FieldFactory.newPathField;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LocalIndexObserverTest.NOOP_EXECUTOR;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.util.LuceneIndexHelper.newLucenePropertyIndexDefinition;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty;
+import static org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
+import static org.apache.jackrabbit.oak.spi.mount.Mounts.defaultMountInfoProvider;
+import static org.junit.Assert.*;
+
+public class DocumentQueueTest {
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));
+
+ private NodeState root = INITIAL_CONTENT;
+ private NodeBuilder builder = root.builder();
+ private EditorHook asyncHook;
+ private EditorHook syncHook;
+ private CommitInfo info;
+
+ private IndexTracker tracker = new IndexTracker();
+ private NRTIndexFactory indexFactory;
+
+ @Before
+ public void setUp() throws IOException {
+ IndexEditorProvider editorProvider = new LuceneIndexEditorProvider(
+ null,
+ null,
+ null,
+ defaultMountInfoProvider()
+ );
+
+ syncHook = new EditorHook(new IndexUpdateProvider(editorProvider));
+ asyncHook = new EditorHook(new IndexUpdateProvider(editorProvider, "async", false));
+ }
+
+ @Test
+ public void dropDocOnLimit() throws Exception{
+ DocumentQueue queue = new DocumentQueue(2, tracker, Clock.SIMPLE, NOOP_EXECUTOR);
+ assertTrue(queue.add(LuceneDoc.forDelete("foo", "bar")));
+ assertTrue(queue.add(LuceneDoc.forDelete("foo", "bar")));
+
+ //3rd one would be dropped as queue size is 2
+ assertFalse(queue.add(LuceneDoc.forDelete("foo", "bar")));
+ }
+
+ @Test
+ public void noIssueIfNoIndex() throws Exception{
+ DocumentQueue queue = new DocumentQueue(2, tracker, Clock.SIMPLE, sameThreadExecutor());
+ assertTrue(queue.add(LuceneDoc.forDelete("foo", "bar")));
+ assertTrue(queue.getQueuedDocs().isEmpty());
+ }
+
+ @Test
+ public void closeQueue() throws Exception{
+ DocumentQueue queue = new DocumentQueue(2, tracker, Clock.SIMPLE, sameThreadExecutor());
+ queue.close();
+
+ try {
+ queue.add(LuceneDoc.forDelete("foo", "bar"));
+ fail();
+ } catch(IllegalStateException ignore){
+
+ }
+ }
+
+ @Test
+ public void noIssueIfNoWriter() throws Exception{
+ NodeState indexed = createAndPopulateAsyncIndex();
+ DocumentQueue queue = new DocumentQueue(2, tracker, Clock.SIMPLE, sameThreadExecutor());
+
+ tracker.update(indexed);
+ assertTrue(queue.add(LuceneDoc.forDelete("/oak:index/fooIndex", "bar")));
+ }
+
+ @Test
+ public void updateDocument() throws Exception{
+ IndexTracker tracker = createTracker();
+ NodeState indexed = createAndPopulateAsyncIndex();
+ tracker.update(indexed);
+ DocumentQueue queue = new DocumentQueue(2, tracker, Clock.SIMPLE, sameThreadExecutor());
+
+ Document d1 = new Document();
+ d1.add(newPathField("/a/b"));
+ d1.add(new StringField("foo", "a", Field.Store.NO));
+ queue.add(LuceneDoc.forUpdate("/oak:index/fooIndex", "/a/b", d1));
+
+ List<NRTIndex> indexes = indexFactory.getIndexes("/oak:index/fooIndex");
+ NRTIndex index = indexes.get(indexes.size() - 1);
+ assertEquals(1, index.getPrimaryReader().getReader().numDocs());
+ }
+
+ @Test
+ public void indexRefresh() throws Exception{
+ IndexTracker tracker = createTracker();
+ NodeState indexed = createAndPopulateAsyncIndex();
+ tracker.update(indexed);
+
+ Clock clock = new Clock.Virtual();
+ clock.waitUntil(System.currentTimeMillis());
+
+ DocumentQueue queue = new DocumentQueue(2, tracker, clock, sameThreadExecutor());
+
+ IndexNode indexNode = tracker.acquireIndexNode("/oak:index/fooIndex");
+ TopDocs td = doSearch(indexNode, "bar");
+ assertEquals(1, td.totalHits);
+
+ addDoc(queue, "/a/b", "bar");
+
+ //First update would be picked as base time was zero which would now
+ //get initialized
+ td = doSearch(indexNode, "bar");
+ assertEquals(2, td.totalHits);
+
+ addDoc(queue, "/a/c", "bar");
+
+ //Now it would not update as refresh interval has not exceeded
+ td = doSearch(indexNode, "bar");
+ assertEquals(2, td.totalHits);
+
+ //Get past the delta time
+ clock.waitUntil(clock.getTime() + indexNode.getRefreshDelta() + 1);
+
+ addDoc(queue, "/a/d", "bar");
+
+ //Now it should show updated result
+ td = doSearch(indexNode, "bar");
+ assertEquals(4, td.totalHits);
+
+ //Phase 2 - Check affect of async index update cycle
+ //With that there should only be 2 copies of NRTIndex kept
+ indexed = doAsyncIndex(indexed, "a2", "bar");
+
+ indexNode.release();
+ tracker.update(indexed);
+ indexNode = tracker.acquireIndexNode("/oak:index/fooIndex");
+
+ //Now result would be latest from async + last local
+ td = doSearch(indexNode, "bar");
+ assertEquals(5, td.totalHits);
+
+ //Now there would be to NRTIndex - previous and current
+ //so add to current and query again
+ addDoc(queue, "/a/e", "bar");
+ td = doSearch(indexNode, "bar");
+ assertEquals(6, td.totalHits);
+
+ //Now do another async update
+ indexed = doAsyncIndex(indexed, "a3", "bar");
+
+ indexNode.release();
+ tracker.update(indexed);
+ indexNode = tracker.acquireIndexNode("/oak:index/fooIndex");
+
+ //Now total count would be 4
+ //3 from async and 1 from current
+ td = doSearch(indexNode, "bar");
+ assertEquals(4, td.totalHits);
+ }
+
+ private NodeState doAsyncIndex(NodeState current, String childName, String fooValue) throws CommitFailedException {
+ //Have some stuff to be indexed
+ NodeBuilder builder = current.builder();
+ builder.child(childName).setProperty("foo", fooValue);
+ NodeState after = builder.getNodeState();
+ return asyncHook.processCommit(current, after, newCommitInfo());
+ }
+
+ private TopDocs doSearch(IndexNode indexNode, String fooValue) throws IOException {
+ return indexNode.getSearcher().search(new TermQuery(new Term("foo", fooValue)), 10);
+ }
+
+ private void addDoc(DocumentQueue queue, String docPath, String fooValue) {
+ Document d1 = new Document();
+ d1.add(newPathField(docPath));
+ d1.add(new StringField("foo", fooValue, Field.Store.NO));
+ queue.add(LuceneDoc.forUpdate("/oak:index/fooIndex", docPath, d1));
+ }
+
+ private IndexTracker createTracker() throws IOException {
+ IndexCopier indexCopier = new IndexCopier(sameThreadExecutor(), temporaryFolder.getRoot());
+ indexFactory = new NRTIndexFactory(indexCopier);
+ return new IndexTracker(
+ new DefaultIndexReaderFactory(defaultMountInfoProvider(), indexCopier),
+ indexFactory
+ );
+ }
+
+ private NodeState createAndPopulateAsyncIndex() throws CommitFailedException {
+ createIndexDefinition("fooIndex");
+
+ //Have some stuff to be indexed
+ builder.child("a").setProperty("foo", "bar");
+ NodeState after = builder.getNodeState();
+ return asyncHook.processCommit(EMPTY_NODE, after, newCommitInfo());
+ }
+
+ private CommitInfo newCommitInfo(){
+ info = new CommitInfo("admin", "s1",
+ ImmutableMap.<String, Object>of(CommitContext.NAME, new SimpleCommitContext()));
+ return info;
+ }
+
+ private void createIndexDefinition(String idxName) {
+ NodeBuilder idx = newLucenePropertyIndexDefinition(builder.child("oak:index"),
+ idxName, ImmutableSet.of("foo"), "async");
+ idx.setProperty(createProperty(IndexConstants.ASYNC_PROPERTY_NAME, of("sync" , "async"), STRINGS));
+ }
+
+}
\ No newline at end of file
Propchange: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.api.ContentRepository;
+import org.apache.jackrabbit.oak.api.Tree;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
+import org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReaderFactory;
+import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent;
+import org.apache.jackrabbit.oak.query.AbstractQueryTest;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
+import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
+import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.collect.ImmutableList.of;
+import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static org.apache.jackrabbit.oak.api.Type.STRINGS;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LucenePropertyIndexTest.createIndex;
+import static org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty;
+import static org.apache.jackrabbit.oak.spi.mount.Mounts.defaultMountInfoProvider;
+import static org.junit.Assert.assertNotNull;
+
+public class HybridIndexTest extends AbstractQueryTest {
+ private ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));
+ private NodeStore nodeStore;
+ private DocumentQueue queue;
+ private Clock clock = new Clock.Virtual();
+ private Whiteboard wb;
+
+ //TODO [hybrid] this needs to be obtained from NRTIndexFactory
+ private long refreshDelta = TimeUnit.SECONDS.toMillis(1);
+
+ @Override
+ protected ContentRepository createRepository() {
+ IndexCopier copier = null;
+ try {
+ copier = new IndexCopier(executorService, temporaryFolder.getRoot());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ MountInfoProvider mip = defaultMountInfoProvider();
+ LuceneIndexEditorProvider editorProvider = new LuceneIndexEditorProvider(copier,
+ null,
+ null,
+ mip);
+
+ NRTIndexFactory nrtIndexFactory = new NRTIndexFactory(copier);
+ LuceneIndexReaderFactory indexReaderFactory = new DefaultIndexReaderFactory(mip, copier);
+ IndexTracker tracker = new IndexTracker(indexReaderFactory,nrtIndexFactory);
+ LuceneIndexProvider provider = new LuceneIndexProvider(tracker);
+
+ queue = new DocumentQueue(100, tracker, clock, sameThreadExecutor());
+ LocalIndexObserver localIndexObserver = new LocalIndexObserver(queue);
+
+ nodeStore = new MemoryNodeStore();
+ Oak oak = new Oak(nodeStore)
+ .with(new InitialContent())
+ .with(new OpenSecurityProvider())
+ .with((QueryIndexProvider) provider)
+ .with((Observer) provider)
+ .with(localIndexObserver)
+ .with(editorProvider)
+ .with(new PropertyIndexEditorProvider())
+ .with(new NodeTypeIndexProvider())
+ .with(new NodeCounterEditorProvider())
+ //Effectively disable async indexing auto run
+ //such that we can control run timing as per test requirement
+ .withAsyncIndexing("async", TimeUnit.DAYS.toSeconds(1));
+
+ wb = oak.getWhiteboard();
+ return oak.createContentRepository();
+ }
+
+ @Test
+ public void hybridIndex() throws Exception{
+ String idxName = "hybridtest";
+ Tree idx = createIndex(root.getTree("/"), idxName, Collections.singleton("foo"));
+ idx.setProperty(createProperty(IndexConstants.ASYNC_PROPERTY_NAME, ImmutableSet.of("sync" , "async"), STRINGS));
+ root.commit();
+ //Run base reindex so reindex flag gets reset to false
+ runAsyncIndex();
+
+ //Get initial indexing done as local indexing only work
+ //for incremental indexing
+ createPath("/a").setProperty("foo", "bar");
+ root.commit();
+
+ //TODO This is required as LuceneIndexEditorContext has side effect of creating a facet
+ //config node
+ runAsyncIndex();
+
+ setTraversalEnabled(false);
+ assertQuery("select [jcr:path] from [nt:base] where [foo] = 'bar'", of("/a"));
+
+ //Add new node. This would not be reflected in result as local index would not be updated
+ createPath("/b").setProperty("foo", "bar");
+ root.commit();
+ assertQuery("select [jcr:path] from [nt:base] where [foo] = 'bar'", of("/a"));
+
+ //Now let some time elapse such that readers can be refreshed
+ clock.waitUntil(clock.getTime() + refreshDelta + 1);
+
+ //TODO This extra push would not be required once refresh also account for time
+ createPath("/c").setProperty("foo", "bar");
+ root.commit();
+
+ //Now recently added stuff should be visible without async indexing run
+ assertQuery("select [jcr:path] from [nt:base] where [foo] = 'bar'", of("/a", "/b", "/c"));
+
+ //Post async index it should still be upto date
+ runAsyncIndex();
+ assertQuery("select [jcr:path] from [nt:base] where [foo] = 'bar'", of("/a", "/b", "/c"));
+ }
+
+ private void runAsyncIndex() {
+ Runnable async = WhiteboardUtils.getService(wb, Runnable.class, new Predicate<Runnable>() {
+ @Override
+ public boolean apply(@Nullable Runnable input) {
+ return input instanceof AsyncIndexUpdate;
+ }
+ });
+ assertNotNull(async);
+ async.run();
+ root.refresh();
+ }
+
+ private Tree createPath(String path){
+ Tree base = root.getTree("/");
+ for (String e : PathUtils.elements(path)){
+ base = base.addChild(e);
+ }
+ return base;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/HybridIndexTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java?rev=1760831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java Thu Sep 15 07:14:06 2016
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.jackrabbit.oak.core.SimpleCommitContext;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertEquals;
+
+public class LocalIndexObserverTest {
+ static final Executor NOOP_EXECUTOR = new Executor() {
+ @Override
+ public void execute(Runnable command) {
+
+ }
+ };
+
+ private IndexTracker tracker = new IndexTracker();
+ private DocumentQueue collectingQueue;
+ private LocalIndexObserver observer;
+
+ @Before
+ public void setUp(){
+ collectingQueue = new DocumentQueue(10, tracker, Clock.SIMPLE, NOOP_EXECUTOR);
+ observer = new LocalIndexObserver(collectingQueue);
+ }
+
+ @Test
+ public void nullCommitInfo() throws Exception{
+ observer.contentChanged(EMPTY_NODE, null);
+ }
+
+ @Test
+ public void noCommitContext() throws Exception{
+ observer.contentChanged(EMPTY_NODE, CommitInfo.EMPTY);
+ }
+
+ @Test
+ public void noDocHolder() throws Exception{
+ observer.contentChanged(EMPTY_NODE, newCommitInfo());
+ }
+
+ @Test
+ public void docsAddedToQueue() throws Exception{
+ CommitInfo info = newCommitInfo();
+ CommitContext cc = (CommitContext) info.getInfo().get(CommitContext.NAME);
+
+ LuceneDocumentHolder holder = new LuceneDocumentHolder();
+ holder.getAsyncIndexedDocList("foo").add(LuceneDoc.forDelete("foo", "bar"));
+
+ cc.set(LuceneDocumentHolder.NAME, holder);
+
+ observer.contentChanged(EMPTY_NODE, info);
+
+ assertEquals(1, collectingQueue.getQueuedDocs().size());
+ }
+
+ private CommitInfo newCommitInfo(){
+ return new CommitInfo("admin", "s1",
+ ImmutableMap.<String, Object>of(CommitContext.NAME, new SimpleCommitContext()));
+ }
+
+
+}
\ No newline at end of file
Propchange: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserverTest.java
------------------------------------------------------------------------------
svn:eol-style = native