You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2014/11/26 11:25:16 UTC
svn commit: r1641789 - in /lucene/dev/branches/branch_5x/lucene:
core/src/java/org/apache/lucene/index/ core/src/test/org/apache/lucene/index/
test-framework/src/java/org/apache/lucene/util/
Author: mikemccand
Date: Wed Nov 26 10:25:15 2014
New Revision: 1641789
URL: http://svn.apache.org/r1641789
Log:
add fun test case for reindexing using ParallelLeafReader
Added:
lucene/dev/branches/branch_5x/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java (with props)
Modified:
lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/IndexReader.java
lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/LeafReader.java
lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/LeafReaderContext.java
lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java
lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java
lucene/dev/branches/branch_5x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
Modified: lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/IndexReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/IndexReader.java?rev=1641789&r1=1641788&r2=1641789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/IndexReader.java (original)
+++ lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/IndexReader.java Wed Nov 26 10:25:15 2014
@@ -20,7 +20,7 @@ package org.apache.lucene.index;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.DocumentStoredFieldVisitor;
import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.Bits; // javadocs
import org.apache.lucene.util.IOUtils;
import java.io.Closeable;
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-// javadocs
/**
IndexReader is an abstract class, providing an interface for accessing a
@@ -100,7 +99,7 @@ public abstract class IndexReader implem
*/
public static interface ReaderClosedListener {
/** Invoked when the {@link IndexReader} is closed. */
- public void onClose(IndexReader reader);
+ public void onClose(IndexReader reader) throws IOException;
}
private final Set<ReaderClosedListener> readerClosedListeners =
@@ -192,7 +191,7 @@ public abstract class IndexReader implem
*/
public final void incRef() {
if (!tryIncRef()) {
- ensureOpen();
+ ensureOpen();
}
}
Modified: lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1641789&r1=1641788&r2=1641789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Wed Nov 26 10:25:15 2014
@@ -3836,6 +3836,14 @@ public class IndexWriter implements Clos
merge.readers.set(i, null);
}
}
+
+ try {
+ merge.mergeFinished();
+ } catch (Throwable t) {
+ if (th == null) {
+ th = t;
+ }
+ }
// If any error occured, throw it.
if (!suppressExceptions) {
Modified: lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/LeafReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/LeafReader.java?rev=1641789&r1=1641788&r2=1641789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/LeafReader.java (original)
+++ lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/LeafReader.java Wed Nov 26 10:25:15 2014
@@ -80,7 +80,7 @@ public abstract class LeafReader extends
public static interface CoreClosedListener {
/** Invoked when the shared core of the original {@code
* SegmentReader} has closed. */
- public void onClose(Object ownerCoreCacheKey);
+ public void onClose(Object ownerCoreCacheKey) throws IOException;
}
private static class CoreClosedListenerWrapper implements ReaderClosedListener {
@@ -92,7 +92,7 @@ public abstract class LeafReader extends
}
@Override
- public void onClose(IndexReader reader) {
+ public void onClose(IndexReader reader) throws IOException {
listener.onClose(reader.getCoreCacheKey());
}
Modified: lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/LeafReaderContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/LeafReaderContext.java?rev=1641789&r1=1641788&r2=1641789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/LeafReaderContext.java (original)
+++ lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/LeafReaderContext.java Wed Nov 26 10:25:15 2014
@@ -66,4 +66,9 @@ public final class LeafReaderContext ext
public LeafReader reader() {
return reader;
}
-}
\ No newline at end of file
+
+ @Override
+ public String toString() {
+ return "LeafReaderContext(" + reader + " docBase=" + docBase + " ord=" + ord + ")";
+ }
+}
Modified: lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java?rev=1641789&r1=1641788&r2=1641789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java (original)
+++ lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java Wed Nov 26 10:25:15 2014
@@ -129,6 +129,10 @@ public abstract class MergePolicy {
totalDocCount = count;
}
+ /** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
+ public void mergeFinished() throws IOException {
+ }
+
/** Expert: Get the list of readers to merge. Note that this list does not
* necessarily match the list of segments to merge and should only be used
* to feed SegmentMerger to initialize a merge. When a {@link OneMerge}
Modified: lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java?rev=1641789&r1=1641788&r2=1641789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java (original)
+++ lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java Wed Nov 26 10:25:15 2014
@@ -28,7 +28,6 @@ import java.util.TreeMap;
import org.apache.lucene.util.Bits;
-
/** An {@link LeafReader} which reads multiple, parallel indexes. Each index
* added must have the same number of documents, but typically each contains
* different fields. Deletions are taken from the first reader.
@@ -322,4 +321,10 @@ public class ParallelLeafReader extends
reader.checkIntegrity();
}
}
+
+ /** Returns the {@link LeafReader}s that were passed on init. */
+ public LeafReader[] getParallelReaders() {
+ ensureOpen();
+ return parallelReaders;
+ }
}
Modified: lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java?rev=1641789&r1=1641788&r2=1641789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java (original)
+++ lucene/dev/branches/branch_5x/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java Wed Nov 26 10:25:15 2014
@@ -67,6 +67,19 @@ public final class ReaderManager extends
current = DirectoryReader.open(dir);
}
+ /**
+ * Creates and returns a new ReaderManager from the given
+ * already-opened {@link DirectoryReader}, stealing
+ * the incoming reference.
+ *
+ * @param reader the directoryReader to use for future reopens
+ *
+ * @throws IOException If there is a low-level I/O error
+ */
+ public ReaderManager(DirectoryReader reader) throws IOException {
+ current = reader;
+ }
+
@Override
protected void decRef(DirectoryReader reference) throws IOException {
reference.decRef();
Added: lucene/dev/branches/branch_5x/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java?rev=1641789&view=auto
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java (added)
+++ lucene/dev/branches/branch_5x/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java Wed Nov 26 10:25:15 2014
@@ -0,0 +1,1339 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.LongField;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.NumericRangeQuery;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.StringHelper;
+import org.apache.lucene.util.TestUtil;
+
+// TODO:
+// - old parallel indices are only pruned on commit/close; can we do it on refresh?
+
+/** Simple example showing how to use ParallelLeafReader to index new
+ * stuff (postings, DVs, etc.) from previously stored fields, on the
+ * fly (during NRT reader reopen), after the initial indexing. The
+ * test indexes just a single stored field with text "content X" (X is
+ * a number embedded in the text).
+ *
+ * Then, on reopen, for any newly created segments (flush or merge), it
+ * builds a new parallel segment by loading all stored docs, parsing
+ * out that X, and adding it as DV and numeric indexed (trie) field.
+ *
+ * Finally, for searching, it builds a top-level MultiReader, with
+ * ParallelLeafReader for each segment, and then tests that random
+ * numeric range queries, and sorting by the new DV field, work
+ * correctly.
+ *
+ * Each per-segment index lives in a private directory next to the main
+ * index, and they are deleted once their segments are removed from the
+ * index. They are "volatile", meaning if e.g. the index is replicated to
+ * another machine, it's OK to not copy parallel segments indices,
+ * since they will just be regnerated (at a cost though). */
+
+// @SuppressSysoutChecks(bugUrl="we print stuff")
+
+public class TestDemoParallelLeafReader extends LuceneTestCase {
+
+ static final boolean DEBUG = false;
+
+ static abstract class ReindexingReader implements Closeable {
+
+ /** Key used to store the current schema gen in the SegmentInfo diagnostics */
+ public final static String SCHEMA_GEN_KEY = "schema_gen";
+
+ public final IndexWriter w;
+ public final ReaderManager mgr;
+
+ private final Directory indexDir;
+ private final Path root;
+ private final Path segsPath;
+
+ /** Which segments have been closed, but their parallel index is not yet not removed. */
+ private final Set<SegmentIDAndGen> closedSegments = Collections.newSetFromMap(new ConcurrentHashMap<SegmentIDAndGen,Boolean>());
+
+ /** Holds currently open parallel readers for each segment. */
+ private final Map<SegmentIDAndGen,LeafReader> parallelReaders = new ConcurrentHashMap<>();
+
+ void printRefCounts() {
+ System.out.println("All refCounts:");
+ for(Map.Entry<SegmentIDAndGen,LeafReader> ent : parallelReaders.entrySet()) {
+ System.out.println(" " + ent.getKey() + " " + ent.getValue() + " refCount=" + ent.getValue().getRefCount());
+ }
+ }
+
+ public ReindexingReader(Path root) throws IOException {
+ this.root = root;
+
+ // Normal index is stored under "index":
+ indexDir = openDirectory(root.resolve("index"));
+
+ // Per-segment parallel indices are stored under subdirs "segs":
+ segsPath = root.resolve("segs");
+ Files.createDirectories(segsPath);
+
+ IndexWriterConfig iwc = getIndexWriterConfig();
+ iwc.setMergePolicy(new ReindexingMergePolicy(iwc.getMergePolicy()));
+ w = new IndexWriter(indexDir, iwc);
+
+ w.getConfig().setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
+ @Override
+ public void warm(LeafReader reader) throws IOException {
+ // This will build the parallel index for the merged segment before the merge becomes visible, so reopen delay is only due to
+ // newly flushed segments:
+ if (DEBUG) System.out.println(Thread.currentThread().getName() +": TEST: now warm " + reader);
+ // TODO: it's not great that we pass false here; it means we close the reader & reopen again for NRT reader; still we did "warm" by
+ // building the parallel index, if necessary
+ getParallelLeafReader(reader, false, getCurrentSchemaGen());
+ }
+ });
+
+ // start with empty commit:
+ w.commit();
+ mgr = new ReaderManager(new ParallelLeafDirectoryReader(DirectoryReader.open(w, true)));
+ }
+
+ protected abstract IndexWriterConfig getIndexWriterConfig() throws IOException;
+
+ /** Optional method to validate that the provided parallell reader in fact reflects the changes in schemaGen. */
+ protected void checkParallelReader(LeafReader reader, LeafReader parallelReader, long schemaGen) throws IOException {
+ }
+
+ /** Override to customize Directory impl. */
+ protected Directory openDirectory(Path path) throws IOException {
+ return FSDirectory.open(path);
+ }
+
+ public void commit() throws IOException {
+ w.commit();
+ }
+
+ LeafReader getCurrentReader(LeafReader reader, long schemaGen) throws IOException {
+ LeafReader parallelReader = getParallelLeafReader(reader, true, schemaGen);
+ if (parallelReader != null) {
+
+ // We should not be embedding one ParallelLeafReader inside another:
+ assertFalse(parallelReader instanceof ParallelLeafReader);
+ assertFalse(reader instanceof ParallelLeafReader);
+
+ // NOTE: important that parallelReader is first, so if there are field name overlaps, because changes to the schema
+ // overwrote existing field names, it wins:
+ LeafReader newReader = new ParallelLeafReader(false, parallelReader, reader) {
+ @Override
+ public Bits getLiveDocs() {
+ return getParallelReaders()[1].getLiveDocs();
+ }
+ @Override
+ public int numDocs() {
+ return getParallelReaders()[1].numDocs();
+ }
+ };
+
+ // Because ParallelLeafReader does its own (extra) incRef:
+ parallelReader.decRef();
+
+ return newReader;
+
+ } else {
+ // This segment was already current as of currentSchemaGen:
+ return reader;
+ }
+ }
+
+ private class ParallelLeafDirectoryReader extends FilterDirectoryReader {
+ public ParallelLeafDirectoryReader(DirectoryReader in) {
+ super(in, new FilterDirectoryReader.SubReaderWrapper() {
+ final long currentSchemaGen = getCurrentSchemaGen();
+ @Override
+ public LeafReader wrap(LeafReader reader) {
+ try {
+ return getCurrentReader(reader, currentSchemaGen);
+ } catch (IOException ioe) {
+ // TODO: must close on exc here:
+ throw new RuntimeException(ioe);
+ }
+ }
+ });
+ }
+
+ @Override
+ protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) {
+ return new ParallelLeafDirectoryReader(in);
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ Throwable firstExc = null;
+ for (final LeafReader r : getSequentialSubReaders()) {
+ if (r instanceof ParallelLeafReader) {
+ // try to close each reader, even if an exception is thrown
+ try {
+ r.decRef();
+ } catch (Throwable t) {
+ if (firstExc == null) {
+ firstExc = t;
+ }
+ }
+ }
+ }
+ // Also close in, so it decRef's the SegmentInfos
+ try {
+ in.doClose();
+ } catch (Throwable t) {
+ if (firstExc == null) {
+ firstExc = t;
+ }
+ }
+ // throw the first exception
+ IOUtils.reThrow(firstExc);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ w.close();
+ if (DEBUG) System.out.println("TEST: after close writer index=" + SegmentInfos.readLatestCommit(indexDir).toString(indexDir));
+
+ /*
+ DirectoryReader r = mgr.acquire();
+ try {
+ TestUtil.checkReader(r);
+ } finally {
+ mgr.release(r);
+ }
+ */
+ mgr.close();
+ pruneOldSegments(true);
+ assertNoExtraSegments();
+ indexDir.close();
+ }
+
+ // Make sure we deleted all parallel indices for segments that are no longer in the main index:
+ private void assertNoExtraSegments() throws IOException {
+ Set<String> liveIDs = new HashSet<String>();
+ for(SegmentCommitInfo info : SegmentInfos.readLatestCommit(indexDir)) {
+ String idString = StringHelper.idToString(info.info.getId());
+ liveIDs.add(idString);
+ }
+
+ // At this point (closing) the only segments in closedSegments should be the still-live ones:
+ for(SegmentIDAndGen segIDGen : closedSegments) {
+ assertTrue(liveIDs.contains(segIDGen.segID));
+ }
+
+ boolean fail = false;
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(segsPath)) {
+ for (Path path : stream) {
+ SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString());
+ if (liveIDs.contains(segIDGen.segID) == false) {
+ if (DEBUG) System.out.println("TEST: fail seg=" + path.getFileName() + " is not live but still has a parallel index");
+ fail = true;
+ }
+ }
+ }
+ assertFalse(fail);
+ }
+
+ private static class SegmentIDAndGen {
+ public final String segID;
+ public final long schemaGen;
+
+ public SegmentIDAndGen(String segID, long schemaGen) {
+ this.segID = segID;
+ this.schemaGen = schemaGen;
+ }
+
+ public SegmentIDAndGen(String s) {
+ String[] parts = s.split("_");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("invalid SegmentIDAndGen \"" + s + "\"");
+ }
+ // TODO: better checking of segID?
+ segID = parts[0];
+ schemaGen = Long.parseLong(parts[1]);
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (segID.hashCode() * schemaGen);
+ }
+
+ @Override
+ public boolean equals(Object _other) {
+ if (_other instanceof SegmentIDAndGen) {
+ SegmentIDAndGen other = (SegmentIDAndGen) _other;
+ return segID.equals(other.segID) && schemaGen == other.schemaGen;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return segID + "_" + schemaGen;
+ }
+ }
+
+ private class ParallelReaderClosed implements LeafReader.ReaderClosedListener {
+ private final SegmentIDAndGen segIDGen;
+ private final Directory dir;
+
+ public ParallelReaderClosed(SegmentIDAndGen segIDGen, Directory dir) {
+ this.segIDGen = segIDGen;
+ this.dir = dir;
+ }
+
+ @Override
+ public void onClose(IndexReader ignored) {
+ try {
+ // TODO: make this sync finer, i.e. just the segment + schemaGen
+ synchronized(ReindexingReader.this) {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now close parallel parLeafReader dir=" + dir + " segIDGen=" + segIDGen);
+ parallelReaders.remove(segIDGen);
+ dir.close();
+ closedSegments.add(segIDGen);
+ }
+ } catch (IOException ioe) {
+ System.out.println("TEST: hit IOExc closing dir=" + dir);
+ ioe.printStackTrace(System.out);
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+
+ // Returns a ref
+ LeafReader getParallelLeafReader(final LeafReader leaf, boolean doCache, long schemaGen) throws IOException {
+ assert leaf instanceof SegmentReader;
+ SegmentInfo info = ((SegmentReader) leaf).getSegmentInfo().info;
+
+ long infoSchemaGen = getSchemaGen(info);
+
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: getParallelLeafReader: " + leaf + " infoSchemaGen=" + infoSchemaGen + " vs schemaGen=" + schemaGen + " doCache=" + doCache);
+
+ if (infoSchemaGen == schemaGen) {
+ if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: segment is already current schemaGen=" + schemaGen + "; skipping");
+ return null;
+ }
+
+ if (infoSchemaGen > schemaGen) {
+ throw new IllegalStateException("segment infoSchemaGen (" + infoSchemaGen + ") cannot be greater than requested schemaGen (" + schemaGen + ")");
+ }
+
+ final SegmentIDAndGen segIDGen = new SegmentIDAndGen(StringHelper.idToString(info.getId()), schemaGen);
+
+ // While loop because the parallel reader may be closed out from under us, so we must retry:
+ while (true) {
+
+ // TODO: make this sync finer, i.e. just the segment + schemaGen
+ synchronized (this) {
+ LeafReader parReader = parallelReaders.get(segIDGen);
+
+ assert doCache || parReader == null;
+
+ if (parReader == null) {
+
+ Path leafIndex = segsPath.resolve(segIDGen.toString());
+
+ final Directory dir = openDirectory(leafIndex);
+
+ if (Files.exists(leafIndex.resolve("done")) == false) {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: build segment index for " + leaf + " " + segIDGen + " (source: " + info.getDiagnostics().get("source") + ") dir=" + leafIndex);
+
+ if (dir.listAll().length != 0) {
+ // It crashed before finishing last time:
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: remove old incomplete index files: " + leafIndex);
+ IOUtils.rm(leafIndex);
+ }
+
+ reindex(infoSchemaGen, schemaGen, leaf, dir);
+
+ // Marker file, telling us this index is in fact done. This way if we crash while doing the reindexing for a given segment, we will
+ // later try again:
+ dir.createOutput("done", IOContext.DEFAULT).close();
+ } else {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: segment index already exists for " + leaf + " " + segIDGen + " (source: " + info.getDiagnostics().get("source") + ") dir=" + leafIndex);
+ }
+
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check index " + dir);
+ //TestUtil.checkIndex(dir);
+
+ SegmentInfos infos = SegmentInfos.readLatestCommit(dir);
+ final LeafReader parLeafReader;
+ if (infos.size() == 1) {
+ parLeafReader = new SegmentReader(infos.info(0), IOContext.DEFAULT);
+ } else {
+ // This just means we didn't forceMerge above:
+ parLeafReader = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir));
+ }
+
+ //checkParallelReader(leaf, parLeafReader, schemaGen);
+
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: opened parallel reader: " + parLeafReader);
+ if (doCache) {
+ parallelReaders.put(segIDGen, parLeafReader);
+
+ // Our id+gen could have been previously closed, e.g. if it was a merged segment that was warmed, so we must clear this else
+ // the pruning may remove our directory:
+ closedSegments.remove(segIDGen);
+
+ parLeafReader.addReaderClosedListener(new ParallelReaderClosed(segIDGen, dir));
+
+ } else {
+ // Used only for merged segment warming:
+ // Messy: we close this reader now, instead of leaving open for reuse:
+ if (DEBUG) System.out.println("TEST: now decRef non cached refCount=" + parLeafReader.getRefCount());
+ parLeafReader.decRef();
+ dir.close();
+
+ // Must do this after dir is closed, else another thread could "rm -rf" while we are closing (which makes MDW.close's
+ // checkIndex angry):
+ closedSegments.add(segIDGen);
+ parReader = null;
+ }
+ parReader = parLeafReader;
+
+ } else {
+ if (parReader.tryIncRef() == false) {
+ // We failed: this reader just got closed by another thread, e.g. refresh thread opening a new reader, so this reader is now
+ // closed and we must try again.
+ if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: tryIncRef failed for " + parReader + "; retry");
+ parReader = null;
+ continue;
+ }
+ if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: use existing already opened parReader=" + parReader + " refCount=" + parReader.getRefCount());
+ //checkParallelReader(leaf, parReader, schemaGen);
+ }
+
+ // We return the new reference to caller
+ return parReader;
+ }
+ }
+ }
+
+ // TODO: we could pass a writer already opened...?
+ protected abstract void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException;
+
+ /** Returns the gen for the current schema. */
+ protected abstract long getCurrentSchemaGen();
+
+ /** Returns the gen that should be merged, meaning those changes will be folded back into the main index. */
+ protected long getMergingSchemaGen() {
+ return getCurrentSchemaGen();
+ }
+
+ /** Removes the parallel index that are no longer in the last commit point. We can't
+ * remove this when the parallel reader is closed because it may still be referenced by
+ * the last commit. */
+ private void pruneOldSegments(boolean removeOldGens) throws IOException {
+ SegmentInfos lastCommit = SegmentInfos.readLatestCommit(indexDir);
+ if (DEBUG) System.out.println("TEST: prune");
+
+ Set<String> liveIDs = new HashSet<String>();
+ for(SegmentCommitInfo info : lastCommit) {
+ String idString = StringHelper.idToString(info.info.getId());
+ liveIDs.add(idString);
+ }
+
+ long currentSchemaGen = getCurrentSchemaGen();
+
+ if (Files.exists(segsPath)) {
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(segsPath)) {
+ for (Path path : stream) {
+ if (Files.isDirectory(path)) {
+ SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString());
+ assert segIDGen.schemaGen <= currentSchemaGen;
+ if (liveIDs.contains(segIDGen.segID) == false && (closedSegments.contains(segIDGen) || (removeOldGens && segIDGen.schemaGen < currentSchemaGen))) {
+ if (DEBUG) System.out.println("TEST: remove " + segIDGen);
+ try {
+ IOUtils.rm(path);
+ closedSegments.remove(segIDGen);
+ } catch (IOException ioe) {
+ // OK, we'll retry later
+ if (DEBUG) System.out.println("TEST: ignore ioe during delete " + path + ":" + ioe);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /** Just replaces the sub-readers with parallel readers, so reindexed fields are merged into new segments. */
+ private class ReindexingMergePolicy extends MergePolicy {
+
+ class ReindexingOneMerge extends OneMerge {
+
+ List<LeafReader> parallelReaders;
+ final long schemaGen;
+
+ ReindexingOneMerge(List<SegmentCommitInfo> segments) {
+ super(segments);
+ // Commit up front to which schemaGen we will merge; we don't want a schema change sneaking in for some of our leaf readers but not others:
+ schemaGen = getMergingSchemaGen();
+ long currentSchemaGen = getCurrentSchemaGen();
+
+ // Defensive sanity check:
+ if (schemaGen > currentSchemaGen) {
+ throw new IllegalStateException("currentSchemaGen (" + currentSchemaGen + ") must always be >= mergingSchemaGen (" + schemaGen + ")");
+ }
+ }
+
+ @Override
+ public List<LeafReader> getMergeReaders() throws IOException {
+ if (parallelReaders == null) {
+ parallelReaders = new ArrayList<>();
+ for (LeafReader reader : super.getMergeReaders()) {
+ parallelReaders.add(getCurrentReader(reader, schemaGen));
+ }
+ }
+
+ return parallelReaders;
+ }
+
+ @Override
+ public void mergeFinished() throws IOException {
+ Throwable th = null;
+ for(LeafReader r : parallelReaders) {
+ if (r instanceof ParallelLeafReader) {
+ try {
+ r.decRef();
+ } catch (Throwable t) {
+ if (th == null) {
+ th = t;
+ }
+ }
+ }
+ }
+
+ // If any error occured, throw it.
+ IOUtils.reThrow(th);
+ }
+
+ @Override
+ public void setInfo(SegmentCommitInfo info) {
+ // Record that this merged segment is current as of this schemaGen:
+ info.info.getDiagnostics().put(SCHEMA_GEN_KEY, Long.toString(schemaGen));
+ super.setInfo(info);
+ }
+
+ @Override
+ public MergePolicy.DocMap getDocMap(final MergeState mergeState) {
+ return super.getDocMap(mergeState);
+ }
+ }
+
+ class ReindexingMergeSpecification extends MergeSpecification {
+ @Override
+ public void add(OneMerge merge) {
+ super.add(new ReindexingOneMerge(merge.segments));
+ }
+
+ @Override
+ public String segString(Directory dir) {
+ return "ReindexingMergeSpec(" + super.segString(dir) + ")";
+ }
+ }
+
+ MergeSpecification wrap(MergeSpecification spec) {
+ MergeSpecification wrapped = null;
+ if (spec != null) {
+ wrapped = new ReindexingMergeSpecification();
+ for (OneMerge merge : spec.merges) {
+ wrapped.add(merge);
+ }
+ }
+ return wrapped;
+ }
+
+ final MergePolicy in;
+
+ /** Create a new {@code MergePolicy} that sorts documents with the given {@code sort}. */
+ public ReindexingMergePolicy(MergePolicy in) {
+ this.in = in;
+ }
+
+ @Override
+ public MergeSpecification findMerges(MergeTrigger mergeTrigger,
+ SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
+ return wrap(in.findMerges(mergeTrigger, segmentInfos, writer));
+ }
+
+ @Override
+ public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
+ int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
+ throws IOException {
+ // TODO: do we need to force-force this? Ie, wrapped MP may think index is already optimized, yet maybe its schemaGen is old? need test!
+ return wrap(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer));
+ }
+
+ @Override
+ public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer)
+ throws IOException {
+ return wrap(in.findForcedDeletesMerges(segmentInfos, writer));
+ }
+
+ @Override
+ public boolean useCompoundFile(SegmentInfos segments,
+ SegmentCommitInfo newSegment, IndexWriter writer) throws IOException {
+ return in.useCompoundFile(segments, newSegment, writer);
+ }
+
+ @Override
+ public String toString() {
+ return "ReindexingMergePolicy(" + in + ")";
+ }
+ }
+
+ static long getSchemaGen(SegmentInfo info) {
+ String s = info.getDiagnostics().get(SCHEMA_GEN_KEY);
+ if (s == null) {
+ return -1;
+ } else {
+ return Long.parseLong(s);
+ }
+ }
+ }
+
+ private ReindexingReader getReindexer(Path root) throws IOException {
+ return new ReindexingReader(root) {
+ @Override
+ protected IndexWriterConfig getIndexWriterConfig() throws IOException {
+ return newIndexWriterConfig();
+ }
+
+ @Override
+ protected Directory openDirectory(Path path) throws IOException {
+ MockDirectoryWrapper dir = newMockFSDirectory(path);
+ dir.setUseSlowOpenClosers(false);
+ dir.setThrottling(Throttling.NEVER);
+ return dir;
+ }
+
+ @Override
+ protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException {
+ IndexWriterConfig iwc = newIndexWriterConfig();
+
+ // The order of our docIDs must precisely matching incoming reader:
+ iwc.setMergePolicy(new LogByteSizeMergePolicy());
+ IndexWriter w = new IndexWriter(parallelDir, iwc);
+ int maxDoc = reader.maxDoc();
+
+ // Slowly parse the stored field into a new doc values field:
+ for(int i=0;i<maxDoc;i++) {
+ // TODO: is this still O(blockSize^2)?
+ Document oldDoc = reader.document(i);
+ Document newDoc = new Document();
+ long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
+ newDoc.add(new NumericDocValuesField("number", value));
+ newDoc.add(new LongField("number", value, Field.Store.NO));
+ w.addDocument(newDoc);
+ }
+
+ if (random().nextBoolean()) {
+ w.forceMerge(1);
+ }
+
+ w.close();
+ }
+
+ @Override
+ protected long getCurrentSchemaGen() {
+ return 0;
+ }
+ };
+ }
+
+ /** Schema change by adding a new number_<schemaGen> DV field each time. */
+ private ReindexingReader getReindexerNewDVFields(Path root, final AtomicLong currentSchemaGen) throws IOException {
+ return new ReindexingReader(root) {
+ @Override
+ protected IndexWriterConfig getIndexWriterConfig() throws IOException {
+ return newIndexWriterConfig();
+ }
+
+ @Override
+ protected Directory openDirectory(Path path) throws IOException {
+ MockDirectoryWrapper dir = newMockFSDirectory(path);
+ dir.setUseSlowOpenClosers(false);
+ dir.setThrottling(Throttling.NEVER);
+ return dir;
+ }
+
+ @Override
+ protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException {
+ IndexWriterConfig iwc = newIndexWriterConfig();
+
+ // The order of our docIDs must precisely matching incoming reader:
+ iwc.setMergePolicy(new LogByteSizeMergePolicy());
+ IndexWriter w = new IndexWriter(parallelDir, iwc);
+ int maxDoc = reader.maxDoc();
+
+ if (oldSchemaGen <= 0) {
+ // Must slowly parse the stored field into a new doc values field:
+ for(int i=0;i<maxDoc;i++) {
+ // TODO: is this still O(blockSize^2)?
+ Document oldDoc = reader.document(i);
+ Document newDoc = new Document();
+ long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
+ newDoc.add(new NumericDocValuesField("number_" + newSchemaGen, value));
+ newDoc.add(new LongField("number", value, Field.Store.NO));
+ w.addDocument(newDoc);
+ }
+ } else {
+ // Just carry over doc values from previous field:
+ NumericDocValues oldValues = reader.getNumericDocValues("number_" + oldSchemaGen);
+ assertNotNull("oldSchemaGen=" + oldSchemaGen, oldValues);
+ for(int i=0;i<maxDoc;i++) {
+ // TODO: is this still O(blockSize^2)?
+ Document oldDoc = reader.document(i);
+ Document newDoc = new Document();
+ newDoc.add(new NumericDocValuesField("number_" + newSchemaGen, oldValues.get(i)));
+ w.addDocument(newDoc);
+ }
+ }
+
+ if (random().nextBoolean()) {
+ w.forceMerge(1);
+ }
+
+ w.close();
+ }
+
+ @Override
+ protected long getCurrentSchemaGen() {
+ return currentSchemaGen.get();
+ }
+
+ @Override
+ protected void checkParallelReader(LeafReader r, LeafReader parR, long schemaGen) throws IOException {
+ String fieldName = "number_" + schemaGen;
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check parallel number DVs field=" + fieldName + " r=" + r + " parR=" + parR);
+ NumericDocValues numbers = parR.getNumericDocValues(fieldName);
+ if (numbers == null) {
+ return;
+ }
+ int maxDoc = r.maxDoc();
+ boolean failed = false;
+ for(int i=0;i<maxDoc;i++) {
+ Document oldDoc = r.document(i);
+ long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
+ if (value != numbers.get(i)) {
+ if (DEBUG) System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i) + " numbers=" + numbers);
+ failed = true;
+ } else if (failed) {
+ if (DEBUG) System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i));
+ }
+ }
+ assertFalse("FAILED field=" + fieldName + " r=" + r, failed);
+ }
+ };
+ }
+
+ /** Schema change by adding changing how the same "number" DV field is indexed. */
+ private ReindexingReader getReindexerSameDVField(Path root, final AtomicLong currentSchemaGen, final AtomicLong mergingSchemaGen) throws IOException {
+ return new ReindexingReader(root) {
+ @Override
+ protected IndexWriterConfig getIndexWriterConfig() throws IOException {
+ return newIndexWriterConfig();
+ }
+
+ @Override
+ protected Directory openDirectory(Path path) throws IOException {
+ MockDirectoryWrapper dir = newMockFSDirectory(path);
+ dir.setUseSlowOpenClosers(false);
+ dir.setThrottling(Throttling.NEVER);
+ return dir;
+ }
+
+ @Override
+ protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException {
+ IndexWriterConfig iwc = newIndexWriterConfig();
+
+ // The order of our docIDs must precisely matching incoming reader:
+ iwc.setMergePolicy(new LogByteSizeMergePolicy());
+ IndexWriter w = new IndexWriter(parallelDir, iwc);
+ int maxDoc = reader.maxDoc();
+
+ if (oldSchemaGen <= 0) {
+ // Must slowly parse the stored field into a new doc values field:
+ for(int i=0;i<maxDoc;i++) {
+ // TODO: is this still O(blockSize^2)?
+ Document oldDoc = reader.document(i);
+ Document newDoc = new Document();
+ long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
+ newDoc.add(new NumericDocValuesField("number", newSchemaGen*value));
+ newDoc.add(new LongField("number", value, Field.Store.NO));
+ w.addDocument(newDoc);
+ }
+ } else {
+ // Just carry over doc values from previous field:
+ NumericDocValues oldValues = reader.getNumericDocValues("number");
+ assertNotNull("oldSchemaGen=" + oldSchemaGen, oldValues);
+ for(int i=0;i<maxDoc;i++) {
+ // TODO: is this still O(blockSize^2)?
+ Document oldDoc = reader.document(i);
+ Document newDoc = new Document();
+ newDoc.add(new NumericDocValuesField("number", newSchemaGen*(oldValues.get(i)/oldSchemaGen)));
+ w.addDocument(newDoc);
+ }
+ }
+
+ if (random().nextBoolean()) {
+ w.forceMerge(1);
+ }
+
+ w.close();
+ }
+
+ @Override
+ protected long getCurrentSchemaGen() {
+ return currentSchemaGen.get();
+ }
+
+ @Override
+ protected long getMergingSchemaGen() {
+ return mergingSchemaGen.get();
+ }
+
+ @Override
+ protected void checkParallelReader(LeafReader r, LeafReader parR, long schemaGen) throws IOException {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check parallel number DVs r=" + r + " parR=" + parR);
+ NumericDocValues numbers = parR.getNumericDocValues("numbers");
+ if (numbers == null) {
+ return;
+ }
+ int maxDoc = r.maxDoc();
+ boolean failed = false;
+ for(int i=0;i<maxDoc;i++) {
+ Document oldDoc = r.document(i);
+ long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
+ value *= schemaGen;
+ if (value != numbers.get(i)) {
+ System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i) + " numbers=" + numbers);
+ failed = true;
+ } else if (failed) {
+ System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i));
+ }
+ }
+ assertFalse("FAILED r=" + r, failed);
+ }
+ };
+ }
+
+ public void testBasicMultipleSchemaGens() throws Exception {
+
+ AtomicLong currentSchemaGen = new AtomicLong();
+
+ // TODO: separate refresh thread, search threads, indexing threads
+ ReindexingReader reindexer = getReindexerNewDVFields(createTempDir(), currentSchemaGen);
+ reindexer.commit();
+
+ Document doc = new Document();
+ doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
+ reindexer.w.addDocument(doc);
+
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: refresh @ 1 doc");
+ reindexer.mgr.maybeRefresh();
+ DirectoryReader r = reindexer.mgr.acquire();
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: got reader=" + r);
+ try {
+ checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
+ } finally {
+ reindexer.mgr.release(r);
+ }
+ //reindexer.printRefCounts();
+
+ currentSchemaGen.incrementAndGet();
+
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: increment schemaGen");
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: commit");
+ reindexer.commit();
+
+ doc = new Document();
+ doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
+ reindexer.w.addDocument(doc);
+
+ if (DEBUG) System.out.println("TEST: refresh @ 2 docs");
+ reindexer.mgr.maybeRefresh();
+ //reindexer.printRefCounts();
+ r = reindexer.mgr.acquire();
+ if (DEBUG) System.out.println("TEST: got reader=" + r);
+ try {
+ checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
+ } finally {
+ reindexer.mgr.release(r);
+ }
+
+ if (DEBUG) System.out.println("TEST: forceMerge");
+ reindexer.w.forceMerge(1);
+
+ currentSchemaGen.incrementAndGet();
+
+ if (DEBUG) System.out.println("TEST: commit");
+ reindexer.commit();
+
+ if (DEBUG) System.out.println("TEST: refresh after forceMerge");
+ reindexer.mgr.maybeRefresh();
+ r = reindexer.mgr.acquire();
+ if (DEBUG) System.out.println("TEST: got reader=" + r);
+ try {
+ checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
+ } finally {
+ reindexer.mgr.release(r);
+ }
+
+ if (DEBUG) System.out.println("TEST: close writer");
+ reindexer.close();
+ }
+
+ public void testRandomMultipleSchemaGens() throws Exception {
+
+ AtomicLong currentSchemaGen = new AtomicLong();
+ ReindexingReader reindexer = null;
+
+ // TODO: separate refresh thread, search threads, indexing threads
+ int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 2000);
+ int maxID = 0;
+ Path root = createTempDir();
+ int refreshEveryNumDocs = 100;
+ int commitCloseNumDocs = 1000;
+ for(int i=0;i<numDocs;i++) {
+ if (reindexer == null) {
+ reindexer = getReindexerNewDVFields(root, currentSchemaGen);
+ }
+
+ Document doc = new Document();
+ String id;
+ String updateID;
+ if (maxID > 0 && random().nextInt(10) == 7) {
+ // Replace a doc
+ id = "" + random().nextInt(maxID);
+ updateID = id;
+ } else {
+ id = "" + (maxID++);
+ updateID = null;
+ }
+
+ doc.add(newStringField("id", id, Field.Store.NO));
+ doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
+ if (updateID == null) {
+ reindexer.w.addDocument(doc);
+ } else {
+ reindexer.w.updateDocument(new Term("id", updateID), doc);
+ }
+ if (random().nextInt(refreshEveryNumDocs) == 17) {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i+1) + " docs");
+ reindexer.mgr.maybeRefresh();
+
+ DirectoryReader r = reindexer.mgr.acquire();
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r);
+ try {
+ checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
+ } finally {
+ reindexer.mgr.release(r);
+ }
+ if (DEBUG) reindexer.printRefCounts();
+ refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs);
+ }
+
+ if (random().nextInt(500) == 17) {
+ currentSchemaGen.incrementAndGet();
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance schemaGen to " + currentSchemaGen);
+ }
+
+ if (i > 0 && random().nextInt(10) == 7) {
+ // Random delete:
+ reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i)));
+ }
+
+ if (random().nextInt(commitCloseNumDocs) == 17) {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i+1) + " docs");
+ reindexer.commit();
+ //reindexer.printRefCounts();
+ commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
+ }
+
+ // Sometimes close & reopen writer/manager, to confirm the parallel segments persist:
+ if (random().nextInt(commitCloseNumDocs) == 17) {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i+1) + " docs");
+ reindexer.close();
+ reindexer = null;
+ commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
+ }
+ }
+
+ if (reindexer != null) {
+ reindexer.close();
+ }
+ }
+
+ /** First schema change creates a new "number" DV field off the stored field; subsequent changes just change the value of that number
+ * field for all docs. */
+ public void testRandomMultipleSchemaGensSameField() throws Exception {
+
+ AtomicLong currentSchemaGen = new AtomicLong();
+ AtomicLong mergingSchemaGen = new AtomicLong();
+
+ ReindexingReader reindexer = null;
+
+ // TODO: separate refresh thread, search threads, indexing threads
+ int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 2000);
+ int maxID = 0;
+ Path root = createTempDir();
+ int refreshEveryNumDocs = 100;
+ int commitCloseNumDocs = 1000;
+
+ for(int i=0;i<numDocs;i++) {
+ if (reindexer == null) {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: open new reader/writer");
+ reindexer = getReindexerSameDVField(root, currentSchemaGen, mergingSchemaGen);
+ }
+
+ Document doc = new Document();
+ String id;
+ String updateID;
+ if (maxID > 0 && random().nextInt(10) == 7) {
+ // Replace a doc
+ id = "" + random().nextInt(maxID);
+ updateID = id;
+ } else {
+ id = "" + (maxID++);
+ updateID = null;
+ }
+
+ doc.add(newStringField("id", id, Field.Store.NO));
+ doc.add(newTextField("text", "number " + TestUtil.nextInt(random(), -10000, 10000), Field.Store.YES));
+ if (updateID == null) {
+ reindexer.w.addDocument(doc);
+ } else {
+ reindexer.w.updateDocument(new Term("id", updateID), doc);
+ }
+ if (random().nextInt(refreshEveryNumDocs) == 17) {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i+1) + " docs");
+ reindexer.mgr.maybeRefresh();
+ DirectoryReader r = reindexer.mgr.acquire();
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r);
+ try {
+ checkAllNumberDVs(r, "number", true, (int) currentSchemaGen.get());
+ } finally {
+ reindexer.mgr.release(r);
+ }
+ if (DEBUG) reindexer.printRefCounts();
+ refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs);
+ }
+
+ if (random().nextInt(500) == 17) {
+ currentSchemaGen.incrementAndGet();
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance schemaGen to " + currentSchemaGen);
+ if (random().nextBoolean()) {
+ mergingSchemaGen.incrementAndGet();
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance mergingSchemaGen to " + mergingSchemaGen);
+ }
+ }
+
+ if (i > 0 && random().nextInt(10) == 7) {
+ // Random delete:
+ reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i)));
+ }
+
+ if (random().nextInt(commitCloseNumDocs) == 17) {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i+1) + " docs");
+ reindexer.commit();
+ //reindexer.printRefCounts();
+ commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
+ }
+
+ // Sometimes close & reopen writer/manager, to confirm the parallel segments persist:
+ if (random().nextInt(commitCloseNumDocs) == 17) {
+ if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i+1) + " docs");
+ reindexer.close();
+ reindexer = null;
+ commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
+ }
+ }
+
+ if (reindexer != null) {
+ reindexer.close();
+ }
+
+ // Verify main index never reflects schema changes beyond mergingSchemaGen:
+ try (Directory dir = newFSDirectory(root.resolve("index"));
+ IndexReader r = DirectoryReader.open(dir)) {
+ for (LeafReaderContext ctx : r.leaves()) {
+ LeafReader leaf = ctx.reader();
+ NumericDocValues numbers = leaf.getNumericDocValues("number");
+ if (numbers != null) {
+ int maxDoc = leaf.maxDoc();
+ for(int i=0;i<maxDoc;i++) {
+ Document doc = leaf.document(i);
+ long value = Long.parseLong(doc.get("text").split(" ")[1]);
+ long dvValue = numbers.get(i);
+ if (value == 0) {
+ assertEquals(0, dvValue);
+ } else {
+ assertTrue(dvValue % value == 0);
+ assertTrue(dvValue / value <= mergingSchemaGen.get());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void testBasic() throws Exception {
+ ReindexingReader reindexer = getReindexer(createTempDir());
+
+ // Start with initial empty commit:
+ reindexer.commit();
+
+ Document doc = new Document();
+ doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
+ reindexer.w.addDocument(doc);
+
+ if (DEBUG) System.out.println("TEST: refresh @ 1 doc");
+ reindexer.mgr.maybeRefresh();
+ DirectoryReader r = reindexer.mgr.acquire();
+ if (DEBUG) System.out.println("TEST: got reader=" + r);
+ try {
+ checkAllNumberDVs(r);
+ IndexSearcher s = newSearcher(r);
+ testNumericDVSort(s);
+ testNumericRangeQuery(s);
+ } finally {
+ reindexer.mgr.release(r);
+ }
+ //reindexer.printRefCounts();
+
+ if (DEBUG) System.out.println("TEST: commit");
+ reindexer.commit();
+
+ doc = new Document();
+ doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
+ reindexer.w.addDocument(doc);
+
+ if (DEBUG) System.out.println("TEST: refresh @ 2 docs");
+ reindexer.mgr.maybeRefresh();
+ //reindexer.printRefCounts();
+ r = reindexer.mgr.acquire();
+ if (DEBUG) System.out.println("TEST: got reader=" + r);
+ try {
+ checkAllNumberDVs(r);
+ IndexSearcher s = newSearcher(r);
+ testNumericDVSort(s);
+ testNumericRangeQuery(s);
+ } finally {
+ reindexer.mgr.release(r);
+ }
+
+ if (DEBUG) System.out.println("TEST: forceMerge");
+ reindexer.w.forceMerge(1);
+
+ if (DEBUG) System.out.println("TEST: commit");
+ reindexer.commit();
+
+ if (DEBUG) System.out.println("TEST: refresh after forceMerge");
+ reindexer.mgr.maybeRefresh();
+ r = reindexer.mgr.acquire();
+ if (DEBUG) System.out.println("TEST: got reader=" + r);
+ try {
+ checkAllNumberDVs(r);
+ IndexSearcher s = newSearcher(r);
+ testNumericDVSort(s);
+ testNumericRangeQuery(s);
+ } finally {
+ reindexer.mgr.release(r);
+ }
+
+ if (DEBUG) System.out.println("TEST: close writer");
+ reindexer.close();
+ }
+
+ public void testRandom() throws Exception {
+ Path root = createTempDir();
+ ReindexingReader reindexer = null;
+
+ // TODO: separate refresh thread, search threads, indexing threads
+ int numDocs = atLeast(3000);
+ int maxID = 0;
+ int refreshEveryNumDocs = 100;
+ for(int i=0;i<numDocs;i++) {
+ if (reindexer == null) {
+ reindexer = getReindexer(root);
+ }
+
+ Document doc = new Document();
+ String id;
+ String updateID;
+ if (maxID > 0 && random().nextInt(10) == 7) {
+ // Replace a doc
+ id = "" + random().nextInt(maxID);
+ updateID = id;
+ } else {
+ id = "" + (maxID++);
+ updateID = null;
+ }
+
+ doc.add(newStringField("id", id, Field.Store.NO));
+ doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
+ if (updateID == null) {
+ reindexer.w.addDocument(doc);
+ } else {
+ reindexer.w.updateDocument(new Term("id", updateID), doc);
+ }
+
+ if (random().nextInt(refreshEveryNumDocs) == 17) {
+ if (DEBUG) System.out.println("TEST: refresh @ " + (i+1) + " docs");
+ reindexer.mgr.maybeRefresh();
+ DirectoryReader r = reindexer.mgr.acquire();
+ if (DEBUG) System.out.println("TEST: got reader=" + r);
+ try {
+ checkAllNumberDVs(r);
+ IndexSearcher s = newSearcher(r);
+ testNumericDVSort(s);
+ testNumericRangeQuery(s);
+ } finally {
+ reindexer.mgr.release(r);
+ }
+ refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs);
+ }
+
+ if (i > 0 && random().nextInt(10) == 7) {
+ // Random delete:
+ reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i)));
+ }
+
+ if (random().nextInt(1000) == 17) {
+ if (DEBUG) System.out.println("TEST: commit @ " + (i+1) + " docs");
+ reindexer.commit();
+ }
+
+ // Sometimes close & reopen writer/manager, to confirm the parallel segments persist:
+ if (random().nextInt(1000) == 17) {
+ if (DEBUG) System.out.println("TEST: close writer @ " + (i+1) + " docs");
+ reindexer.close();
+ reindexer = null;
+ }
+ }
+ if (reindexer != null) {
+ reindexer.close();
+ }
+ }
+
+ private static void checkAllNumberDVs(IndexReader r) throws IOException {
+ checkAllNumberDVs(r, "number", true, 1);
+ }
+
+ private static void checkAllNumberDVs(IndexReader r, String fieldName, boolean doThrow, int multiplier) throws IOException {
+ NumericDocValues numbers = MultiDocValues.getNumericValues(r, fieldName);
+ int maxDoc = r.maxDoc();
+ boolean failed = false;
+ long t0 = System.currentTimeMillis();
+ for(int i=0;i<maxDoc;i++) {
+ Document oldDoc = r.document(i);
+ long value = multiplier * Long.parseLong(oldDoc.get("text").split(" ")[1]);
+ if (value != numbers.get(i)) {
+ System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i) + " numbers=" + numbers);
+ failed = true;
+ } else if (failed) {
+ System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i));
+ }
+ }
+ if (failed) {
+ if (r instanceof LeafReader == false) {
+ System.out.println("TEST FAILED; check leaves");
+ for(LeafReaderContext ctx : r.leaves()) {
+ System.out.println("CHECK LEAF=" + ctx.reader());
+ checkAllNumberDVs(ctx.reader(), fieldName, false, 1);
+ }
+ }
+ if (doThrow) {
+ assertFalse("FAILED field=" + fieldName + " r=" + r, failed);
+ } else {
+ System.out.println("FAILED field=" + fieldName + " r=" + r);
+ }
+ }
+ }
+
+ private static void testNumericDVSort(IndexSearcher s) throws IOException {
+ // Confirm we can sort by the new DV field:
+ TopDocs hits = s.search(new MatchAllDocsQuery(), 100, new Sort(new SortField("number", SortField.Type.LONG)));
+ NumericDocValues numbers = MultiDocValues.getNumericValues(s.getIndexReader(), "number");
+ long last = Long.MIN_VALUE;
+ for(ScoreDoc scoreDoc : hits.scoreDocs) {
+ long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]);
+ assertTrue(value >= last);
+ assertEquals(value, numbers.get(scoreDoc.doc));
+ last = value;
+ }
+ }
+
+ private static void testNumericRangeQuery(IndexSearcher s) throws IOException {
+ NumericDocValues numbers = MultiDocValues.getNumericValues(s.getIndexReader(), "number");
+ for(int i=0;i<100;i++) {
+ // Confirm we can range search by the new indexed (numeric) field:
+ long min = random().nextLong();
+ long max = random().nextLong();
+ if (min > max) {
+ long x = min;
+ min = max;
+ max = x;
+ }
+
+ TopDocs hits = s.search(NumericRangeQuery.newLongRange("number", min, max, true, true), 100);
+ for(ScoreDoc scoreDoc : hits.scoreDocs) {
+ long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]);
+ assertTrue(value >= min);
+ assertTrue(value <= max);
+ assertEquals(value, numbers.get(scoreDoc.doc));
+ }
+ }
+ }
+
+ // TODO: test exceptions
+}
Modified: lucene/dev/branches/branch_5x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java?rev=1641789&r1=1641788&r2=1641789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (original)
+++ lucene/dev/branches/branch_5x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java Wed Nov 26 10:25:15 2014
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.logging.Logger;
import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
@@ -863,6 +864,11 @@ public abstract class LuceneTestCase ext
}
/** create a new index writer config with random defaults */
+ public static IndexWriterConfig newIndexWriterConfig() {
+ return newIndexWriterConfig(new MockAnalyzer(random()));
+ }
+
+ /** create a new index writer config with random defaults */
public static IndexWriterConfig newIndexWriterConfig(Analyzer a) {
return newIndexWriterConfig(random(), a);
}