You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2013/10/21 20:58:44 UTC
svn commit: r1534320 [11/39] - in /lucene/dev/branches/lucene4956: ./
dev-tools/ dev-tools/idea/.idea/ dev-tools/idea/lucene/expressions/
dev-tools/idea/solr/contrib/velocity/ dev-tools/maven/
dev-tools/maven/lucene/ dev-tools/maven/lucene/expressions/...
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java Mon Oct 21 18:58:24 2013
@@ -410,7 +410,7 @@ public class CheckIndex {
// note: we only read the format byte (required preamble) here!
IndexInput input = null;
try {
- input = dir.openInput(segmentsFileName, IOContext.DEFAULT);
+ input = dir.openInput(segmentsFileName, IOContext.READONCE);
} catch (Throwable t) {
msg(infoStream, "ERROR: could not open segments file in directory");
if (infoStream != null)
@@ -496,6 +496,11 @@ public class CheckIndex {
msg(infoStream, " " + (1+i) + " of " + numSegments + ": name=" + info.info.name + " docCount=" + info.info.getDocCount());
segInfoStat.name = info.info.name;
segInfoStat.docCount = info.info.getDocCount();
+
+ final String version = info.info.getVersion();
+ if (info.info.getDocCount() <= 0 && version != null && versionComparator.compare(version, "4.5") >= 0) {
+ throw new RuntimeException("illegal number of documents: maxDoc=" + info.info.getDocCount());
+ }
int toLoseDocCount = info.info.getDocCount();
@@ -517,11 +522,6 @@ public class CheckIndex {
msg(infoStream, " diagnostics = " + diagnostics);
}
- Map<String,String> atts = info.info.attributes();
- if (atts != null && !atts.isEmpty()) {
- msg(infoStream, " attributes = " + atts);
- }
-
if (!info.hasDeletions()) {
msg(infoStream, " no deletions");
segInfoStat.hasDeletions = false;
@@ -744,10 +744,40 @@ public class CheckIndex {
continue;
}
+ final boolean hasFreqs = terms.hasFreqs();
final boolean hasPositions = terms.hasPositions();
+ final boolean hasPayloads = terms.hasPayloads();
final boolean hasOffsets = terms.hasOffsets();
- // term vectors cannot omit TF
- final boolean hasFreqs = isVectors || fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
+
+ // term vectors cannot omit TF:
+ final boolean expectedHasFreqs = (isVectors || fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS) >= 0);
+
+ if (hasFreqs != expectedHasFreqs) {
+ throw new RuntimeException("field \"" + field + "\" should have hasFreqs=" + expectedHasFreqs + " but got " + hasFreqs);
+ }
+
+ if (hasFreqs == false) {
+ if (terms.getSumTotalTermFreq() != -1) {
+ throw new RuntimeException("field \"" + field + "\" hasFreqs is false, but Terms.getSumTotalTermFreq()=" + terms.getSumTotalTermFreq() + " (should be -1)");
+ }
+ }
+
+ if (!isVectors) {
+ final boolean expectedHasPositions = fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
+ if (hasPositions != expectedHasPositions) {
+ throw new RuntimeException("field \"" + field + "\" should have hasPositions=" + expectedHasPositions + " but got " + hasPositions);
+ }
+
+ final boolean expectedHasPayloads = fieldInfo.hasPayloads();
+ if (hasPayloads != expectedHasPayloads) {
+ throw new RuntimeException("field \"" + field + "\" should have hasPayloads=" + expectedHasPayloads + " but got " + hasPayloads);
+ }
+
+ final boolean expectedHasOffsets = fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;
+ if (hasOffsets != expectedHasOffsets) {
+ throw new RuntimeException("field \"" + field + "\" should have hasOffsets=" + expectedHasOffsets + " but got " + hasOffsets);
+ }
+ }
final TermsEnum termsEnum = terms.iterator(null);
@@ -756,8 +786,6 @@ public class CheckIndex {
BytesRef lastTerm = null;
- Comparator<BytesRef> termComp = terms.getComparator();
-
long sumTotalTermFreq = 0;
long sumDocFreq = 0;
FixedBitSet visitedDocs = new FixedBitSet(maxDoc);
@@ -775,7 +803,7 @@ public class CheckIndex {
if (lastTerm == null) {
lastTerm = BytesRef.deepCopyOf(term);
} else {
- if (termComp.compare(lastTerm, term) >= 0) {
+ if (lastTerm.compareTo(term) >= 0) {
throw new RuntimeException("terms out of order: lastTerm=" + lastTerm + " term=" + term);
}
lastTerm.copyBytes(term);
@@ -789,6 +817,12 @@ public class CheckIndex {
docs = termsEnum.docs(liveDocs, docs);
postings = termsEnum.docsAndPositions(liveDocs, postings);
+
+ if (hasFreqs == false) {
+ if (termsEnum.totalTermFreq() != -1) {
+ throw new RuntimeException("field \"" + field + "\" hasFreqs is false, but TermsEnum.totalTermFreq()=" + termsEnum.totalTermFreq() + " (should be -1)");
+ }
+ }
if (hasOrd) {
long ord = -1;
@@ -831,6 +865,13 @@ public class CheckIndex {
}
status.totPos += freq;
totalTermFreq += freq;
+ } else {
+ // When a field didn't index freq, it must
+ // consistently "lie" and pretend that freq was
+ // 1:
+ if (docs2.freq() != 1) {
+ throw new RuntimeException("term " + term + ": doc " + doc + ": freq " + freq + " != 1 when Terms.hasFreqs() is false");
+ }
}
docCount++;
@@ -1275,7 +1316,8 @@ public class CheckIndex {
if (reader.getBinaryDocValues(fieldInfo.name) != null ||
reader.getNumericDocValues(fieldInfo.name) != null ||
reader.getSortedDocValues(fieldInfo.name) != null ||
- reader.getSortedSetDocValues(fieldInfo.name) != null) {
+ reader.getSortedSetDocValues(fieldInfo.name) != null ||
+ reader.getDocsWithField(fieldInfo.name) != null) {
throw new RuntimeException("field: " + fieldInfo.name + " has docvalues but should omit them!");
}
}
@@ -1296,26 +1338,37 @@ public class CheckIndex {
return status;
}
- private static void checkBinaryDocValues(String fieldName, AtomicReader reader, BinaryDocValues dv) {
+ private static void checkBinaryDocValues(String fieldName, AtomicReader reader, BinaryDocValues dv, Bits docsWithField) {
BytesRef scratch = new BytesRef();
for (int i = 0; i < reader.maxDoc(); i++) {
dv.get(i, scratch);
assert scratch.isValid();
+ if (docsWithField.get(i) == false && scratch.length > 0) {
+ throw new RuntimeException("dv for field: " + fieldName + " is missing but has value=" + scratch + " for doc: " + i);
+ }
}
}
- private static void checkSortedDocValues(String fieldName, AtomicReader reader, SortedDocValues dv) {
- checkBinaryDocValues(fieldName, reader, dv);
+ private static void checkSortedDocValues(String fieldName, AtomicReader reader, SortedDocValues dv, Bits docsWithField) {
+ checkBinaryDocValues(fieldName, reader, dv, docsWithField);
final int maxOrd = dv.getValueCount()-1;
FixedBitSet seenOrds = new FixedBitSet(dv.getValueCount());
int maxOrd2 = -1;
for (int i = 0; i < reader.maxDoc(); i++) {
int ord = dv.getOrd(i);
- if (ord < 0 || ord > maxOrd) {
+ if (ord == -1) {
+ if (docsWithField.get(i)) {
+ throw new RuntimeException("dv for field: " + fieldName + " has -1 ord but is not marked missing for doc: " + i);
+ }
+ } else if (ord < -1 || ord > maxOrd) {
throw new RuntimeException("ord out of bounds: " + ord);
+ } else {
+ if (!docsWithField.get(i)) {
+ throw new RuntimeException("dv for field: " + fieldName + " is missing but has ord=" + ord + " for doc: " + i);
+ }
+ maxOrd2 = Math.max(maxOrd2, ord);
+ seenOrds.set(ord);
}
- maxOrd2 = Math.max(maxOrd2, ord);
- seenOrds.set(ord);
}
if (maxOrd != maxOrd2) {
throw new RuntimeException("dv for field: " + fieldName + " reports wrong maxOrd=" + maxOrd + " but this is not the case: " + maxOrd2);
@@ -1337,7 +1390,7 @@ public class CheckIndex {
}
}
- private static void checkSortedSetDocValues(String fieldName, AtomicReader reader, SortedSetDocValues dv) {
+ private static void checkSortedSetDocValues(String fieldName, AtomicReader reader, SortedSetDocValues dv, Bits docsWithField) {
final long maxOrd = dv.getValueCount()-1;
OpenBitSet seenOrds = new OpenBitSet(dv.getValueCount());
long maxOrd2 = -1;
@@ -1345,16 +1398,28 @@ public class CheckIndex {
dv.setDocument(i);
long lastOrd = -1;
long ord;
- while ((ord = dv.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
- if (ord <= lastOrd) {
- throw new RuntimeException("ords out of order: " + ord + " <= " + lastOrd + " for doc: " + i);
+ if (docsWithField.get(i)) {
+ int ordCount = 0;
+ while ((ord = dv.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+ ordCount++;
+ if (ord <= lastOrd) {
+ throw new RuntimeException("ords out of order: " + ord + " <= " + lastOrd + " for doc: " + i);
+ }
+ if (ord < 0 || ord > maxOrd) {
+ throw new RuntimeException("ord out of bounds: " + ord);
+ }
+ lastOrd = ord;
+ maxOrd2 = Math.max(maxOrd2, ord);
+ seenOrds.set(ord);
}
- if (ord < 0 || ord > maxOrd) {
- throw new RuntimeException("ord out of bounds: " + ord);
+ if (ordCount == 0) {
+ throw new RuntimeException("dv for field: " + fieldName + " has no ordinals but is not marked missing for doc: " + i);
+ }
+ } else {
+ long o = dv.nextOrd();
+ if (o != SortedSetDocValues.NO_MORE_ORDS) {
+ throw new RuntimeException("dv for field: " + fieldName + " is marked missing but has ord=" + o + " for doc: " + i);
}
- lastOrd = ord;
- maxOrd2 = Math.max(maxOrd2, ord);
- seenOrds.set(ord);
}
}
if (maxOrd != maxOrd2) {
@@ -1378,17 +1443,26 @@ public class CheckIndex {
}
}
- private static void checkNumericDocValues(String fieldName, AtomicReader reader, NumericDocValues ndv) {
+ private static void checkNumericDocValues(String fieldName, AtomicReader reader, NumericDocValues ndv, Bits docsWithField) {
for (int i = 0; i < reader.maxDoc(); i++) {
- ndv.get(i);
+ long value = ndv.get(i);
+ if (docsWithField.get(i) == false && value != 0) {
+ throw new RuntimeException("dv for field: " + fieldName + " is marked missing but has value=" + value + " for doc: " + i);
+ }
}
}
private static void checkDocValues(FieldInfo fi, AtomicReader reader, PrintStream infoStream, DocValuesStatus status) throws Exception {
+ Bits docsWithField = reader.getDocsWithField(fi.name);
+ if (docsWithField == null) {
+ throw new RuntimeException(fi.name + " docsWithField does not exist");
+ } else if (docsWithField.length() != reader.maxDoc()) {
+ throw new RuntimeException(fi.name + " docsWithField has incorrect length: " + docsWithField.length() + ",expected: " + reader.maxDoc());
+ }
switch(fi.getDocValuesType()) {
case SORTED:
status.totalSortedFields++;
- checkSortedDocValues(fi.name, reader, reader.getSortedDocValues(fi.name));
+ checkSortedDocValues(fi.name, reader, reader.getSortedDocValues(fi.name), docsWithField);
if (reader.getBinaryDocValues(fi.name) != null ||
reader.getNumericDocValues(fi.name) != null ||
reader.getSortedSetDocValues(fi.name) != null) {
@@ -1397,7 +1471,7 @@ public class CheckIndex {
break;
case SORTED_SET:
status.totalSortedSetFields++;
- checkSortedSetDocValues(fi.name, reader, reader.getSortedSetDocValues(fi.name));
+ checkSortedSetDocValues(fi.name, reader, reader.getSortedSetDocValues(fi.name), docsWithField);
if (reader.getBinaryDocValues(fi.name) != null ||
reader.getNumericDocValues(fi.name) != null ||
reader.getSortedDocValues(fi.name) != null) {
@@ -1406,7 +1480,7 @@ public class CheckIndex {
break;
case BINARY:
status.totalBinaryFields++;
- checkBinaryDocValues(fi.name, reader, reader.getBinaryDocValues(fi.name));
+ checkBinaryDocValues(fi.name, reader, reader.getBinaryDocValues(fi.name), docsWithField);
if (reader.getNumericDocValues(fi.name) != null ||
reader.getSortedDocValues(fi.name) != null ||
reader.getSortedSetDocValues(fi.name) != null) {
@@ -1415,7 +1489,7 @@ public class CheckIndex {
break;
case NUMERIC:
status.totalNumericFields++;
- checkNumericDocValues(fi.name, reader, reader.getNumericDocValues(fi.name));
+ checkNumericDocValues(fi.name, reader, reader.getNumericDocValues(fi.name), docsWithField);
if (reader.getBinaryDocValues(fi.name) != null ||
reader.getSortedDocValues(fi.name) != null ||
reader.getSortedSetDocValues(fi.name) != null) {
@@ -1430,7 +1504,7 @@ public class CheckIndex {
private static void checkNorms(FieldInfo fi, AtomicReader reader, PrintStream infoStream) throws IOException {
switch(fi.getNormType()) {
case NUMERIC:
- checkNumericDocValues(fi.name, reader, reader.getNormValues(fi.name));
+ checkNumericDocValues(fi.name, reader, reader.getNormValues(fi.name), new Bits.MatchAllBits(reader.maxDoc()));
break;
default:
throw new AssertionError("wtf: " + fi.getNormType());
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java Mon Oct 21 18:58:24 2013
@@ -25,24 +25,32 @@ import java.util.Map;
import org.apache.lucene.search.Query;
import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit;
+import org.apache.lucene.util.MergedIterator;
class CoalescedDeletes {
final Map<Query,Integer> queries = new HashMap<Query,Integer>();
final List<Iterable<Term>> iterables = new ArrayList<Iterable<Term>>();
-
+ final List<NumericUpdate> numericDVUpdates = new ArrayList<NumericUpdate>();
+
@Override
public String toString() {
// note: we could add/collect more debugging information
- return "CoalescedDeletes(termSets=" + iterables.size() + ",queries=" + queries.size() + ")";
+ return "CoalescedDeletes(termSets=" + iterables.size() + ",queries=" + queries.size() + ",numericUpdates=" + numericDVUpdates.size() + ")";
}
void update(FrozenBufferedDeletes in) {
iterables.add(in.termsIterable());
- for(int queryIdx=0;queryIdx<in.queries.length;queryIdx++) {
+ for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) {
final Query query = in.queries[queryIdx];
queries.put(query, BufferedDeletes.MAX_INT);
}
+
+ for (NumericUpdate nu : in.updates) {
+ NumericUpdate clone = new NumericUpdate(nu.term, nu.field, nu.value);
+ clone.docIDUpto = Integer.MAX_VALUE;
+ numericDVUpdates.add(clone);
+ }
}
public Iterable<Term> termsIterable() {
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java Mon Oct 21 18:58:24 2013
@@ -85,7 +85,7 @@ final class DocFieldProcessor extends Do
// FreqProxTermsWriter does this with
// FieldInfo.storePayload.
FieldInfosWriter infosWriter = codec.fieldInfosFormat().getFieldInfosWriter();
- infosWriter.write(state.directory, state.segmentInfo.name, state.fieldInfos, IOContext.DEFAULT);
+ infosWriter.write(state.directory, state.segmentInfo.name, "", state.fieldInfos, IOContext.DEFAULT);
}
@Override
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocInverterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocInverterPerField.java Mon Oct 21 18:58:24 2013
@@ -92,13 +92,9 @@ final class DocInverterPerField extends
fieldState.position += analyzed ? docState.analyzer.getPositionIncrementGap(fieldInfo.name) : 0;
}
- final TokenStream stream = field.tokenStream(docState.analyzer);
- // reset the TokenStream to the first token
- stream.reset();
-
- boolean success2 = false;
-
- try {
+ try (TokenStream stream = field.tokenStream(docState.analyzer)) {
+ // reset the TokenStream to the first token
+ stream.reset();
boolean hasMoreTokens = stream.incrementToken();
fieldState.attributeSource = stream;
@@ -175,15 +171,10 @@ final class DocInverterPerField extends
}
// trigger streams to perform end-of-stream operations
stream.end();
-
+ // TODO: maybe add some safety? then again, its already checked
+ // when we come back around to the field...
+ fieldState.position += posIncrAttribute.getPositionIncrement();
fieldState.offset += offsetAttribute.endOffset();
- success2 = true;
- } finally {
- if (!success2) {
- IOUtils.closeWhileHandlingException(stream);
- } else {
- stream.close();
- }
}
fieldState.offset += analyzed ? docState.analyzer.getOffsetGap(fieldInfo.name) : 0;
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java Mon Oct 21 18:58:24 2013
@@ -20,7 +20,6 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.List;
import org.apache.lucene.codecs.PostingsFormat; // javadocs
@@ -611,11 +610,6 @@ public class DocTermOrds {
termsEnum = reader.fields().terms(field).iterator(null);
}
- @Override
- public Comparator<BytesRef> getComparator() {
- return termsEnum.getComparator();
- }
-
@Override
public DocsEnum docs(Bits liveDocs, DocsEnum reuse, int flags) throws IOException {
return termsEnum.docs(liveDocs, reuse, flags);
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java Mon Oct 21 18:58:24 2013
@@ -143,7 +143,7 @@ final class DocValuesProcessor extends S
DocValuesWriter writer = writers.get(fieldInfo.name);
NumericDocValuesWriter numericWriter;
if (writer == null) {
- numericWriter = new NumericDocValuesWriter(fieldInfo, bytesUsed);
+ numericWriter = new NumericDocValuesWriter(fieldInfo, bytesUsed, true);
writers.put(fieldInfo.name, numericWriter);
} else if (!(writer instanceof NumericDocValuesWriter)) {
throw new IllegalArgumentException("Incompatible DocValues type: field \"" + fieldInfo.name + "\" changed from " + getTypeDesc(writer) + " to numeric");
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Oct 21 18:58:24 2013
@@ -19,18 +19,18 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
-import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
-import org.apache.lucene.index.FieldInfos.FieldNumbers;
+import org.apache.lucene.index.IndexWriter.Event;
import org.apache.lucene.search.Query;
-import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream;
@@ -100,19 +100,15 @@ import org.apache.lucene.util.InfoStream
*/
final class DocumentsWriter {
- Directory directory;
+ private final Directory directory;
private volatile boolean closed;
- final InfoStream infoStream;
- Similarity similarity;
+ private final InfoStream infoStream;
- List<String> newFiles;
+ private final LiveIndexWriterConfig config;
- final IndexWriter indexWriter;
- final LiveIndexWriterConfig indexWriterConfig;
-
- private AtomicInteger numDocsInRAM = new AtomicInteger(0);
+ private final AtomicInteger numDocsInRAM = new AtomicInteger(0);
// TODO: cut over to BytesRefHash in BufferedDeletes
volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
@@ -125,73 +121,79 @@ final class DocumentsWriter {
*/
private volatile boolean pendingChangesInCurrentFullFlush;
- private Collection<String> abortedFiles; // List of files that were written before last abort()
-
- final IndexingChain chain;
-
final DocumentsWriterPerThreadPool perThreadPool;
final FlushPolicy flushPolicy;
final DocumentsWriterFlushControl flushControl;
+ private final IndexWriter writer;
+ private final Queue<Event> events;
+
- final Codec codec;
- DocumentsWriter(Codec codec, LiveIndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumbers globalFieldNumbers,
- BufferedDeletesStream bufferedDeletesStream) {
- this.codec = codec;
+ DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directory) {
this.directory = directory;
- this.indexWriter = writer;
+ this.config = config;
this.infoStream = config.getInfoStream();
- this.similarity = config.getSimilarity();
- this.indexWriterConfig = writer.getConfig();
this.perThreadPool = config.getIndexerThreadPool();
- this.chain = config.getIndexingChain();
- this.perThreadPool.initialize(this, globalFieldNumbers, config);
flushPolicy = config.getFlushPolicy();
- assert flushPolicy != null;
- flushPolicy.init(this);
- flushControl = new DocumentsWriterFlushControl(this, config);
+ this.writer = writer;
+ this.events = new ConcurrentLinkedQueue<Event>();
+ flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedDeletesStream);
}
-
- synchronized void deleteQueries(final Query... queries) throws IOException {
+
+ synchronized boolean deleteQueries(final Query... queries) throws IOException {
+ // TODO why is this synchronized?
+ final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(queries);
flushControl.doOnDelete();
- if (flushControl.doApplyAllDeletes()) {
- applyAllDeletes(deleteQueue);
- }
+ return applyAllDeletes(deleteQueue);
}
// TODO: we could check w/ FreqProxTermsWriter: if the
// term doesn't exist, don't bother buffering into the
// per-DWPT map (but still must go into the global map)
- synchronized void deleteTerms(final Term... terms) throws IOException {
+ synchronized boolean deleteTerms(final Term... terms) throws IOException {
+ // TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(terms);
flushControl.doOnDelete();
- if (flushControl.doApplyAllDeletes()) {
- applyAllDeletes(deleteQueue);
- }
+ return applyAllDeletes( deleteQueue);
+ }
+
+ synchronized boolean updateNumericDocValue(Term term, String field, Long value) throws IOException {
+ final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
+ deleteQueue.addNumericUpdate(new NumericUpdate(term, field, value));
+ flushControl.doOnDelete();
+ return applyAllDeletes(deleteQueue);
}
DocumentsWriterDeleteQueue currentDeleteSession() {
return deleteQueue;
}
- private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
- if (deleteQueue != null && !flushControl.isFullFlush()) {
- ticketQueue.addDeletesAndPurge(this, deleteQueue);
+ private final boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+ if (flushControl.getAndResetApplyAllDeletes()) {
+ if (deleteQueue != null && !flushControl.isFullFlush()) {
+ ticketQueue.addDeletes(deleteQueue);
+ }
+ putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge
+ return true;
}
- indexWriter.applyAllDeletes();
- indexWriter.flushCount.incrementAndGet();
+ return false;
}
+
+ final int purgeBuffer(IndexWriter writer, boolean forced) throws IOException {
+ if (forced) {
+ return ticketQueue.forcePurge(writer);
+ } else {
+ return ticketQueue.tryPurge(writer);
+ }
+ }
+
/** Returns how many docs are currently buffered in RAM. */
int getNumDocs() {
return numDocsInRAM.get();
}
- Collection<String> abortedFiles() {
- return abortedFiles;
- }
-
private void ensureOpen() throws AlreadyClosedException {
if (closed) {
throw new AlreadyClosedException("this IndexWriter is closed");
@@ -202,45 +204,37 @@ final class DocumentsWriter {
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
- synchronized void abort() {
+ synchronized void abort(IndexWriter writer) {
+ assert !Thread.holdsLock(writer) : "IndexWriter lock should never be hold when aborting";
boolean success = false;
-
+ final Set<String> newFilesSet = new HashSet<String>();
try {
deleteQueue.clear();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "abort");
}
-
final int limit = perThreadPool.getActiveThreadState();
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
try {
- if (perThread.isActive()) { // we might be closed
- try {
- perThread.dwpt.abort();
- } finally {
- perThread.dwpt.checkAndResetHasAborted();
- flushControl.doOnAbort(perThread);
- }
- } else {
- assert closed;
- }
+ abortThreadState(perThread, newFilesSet);
} finally {
perThread.unlock();
}
}
- flushControl.abortPendingFlushes();
+ flushControl.abortPendingFlushes(newFilesSet);
+ putEvent(new DeleteNewFilesEvent(newFilesSet));
flushControl.waitForFlush();
success = true;
} finally {
if (infoStream.isEnabled("DW")) {
- infoStream.message("DW", "done abort; abortedFiles=" + abortedFiles + " success=" + success);
+ infoStream.message("DW", "done abort; abortedFiles=" + newFilesSet + " success=" + success);
}
}
}
- synchronized void lockAndAbortAll() {
+ synchronized void lockAndAbortAll(IndexWriter indexWriter) {
assert indexWriter.holdsFullFlushLock();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "lockAndAbortAll");
@@ -249,20 +243,15 @@ final class DocumentsWriter {
try {
deleteQueue.clear();
final int limit = perThreadPool.getMaxThreadStates();
+ final Set<String> newFilesSet = new HashSet<String>();
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
- if (perThread.isActive()) { // we might be closed or
- try {
- perThread.dwpt.abort();
- } finally {
- perThread.dwpt.checkAndResetHasAborted();
- flushControl.doOnAbort(perThread);
- }
- }
+ abortThreadState(perThread, newFilesSet);
}
deleteQueue.clear();
- flushControl.abortPendingFlushes();
+ flushControl.abortPendingFlushes(newFilesSet);
+ putEvent(new DeleteNewFilesEvent(newFilesSet));
flushControl.waitForFlush();
success = true;
} finally {
@@ -271,12 +260,31 @@ final class DocumentsWriter {
}
if (!success) {
// if something happens here we unlock all states again
- unlockAllAfterAbortAll();
+ unlockAllAfterAbortAll(indexWriter);
+ }
+ }
+ }
+
+ private final void abortThreadState(final ThreadState perThread, Set<String> newFiles) {
+ assert perThread.isHeldByCurrentThread();
+ if (perThread.isActive()) { // we might be closed
+ if (perThread.isInitialized()) {
+ try {
+ subtractFlushedNumDocs(perThread.dwpt.getNumDocsInRAM());
+ perThread.dwpt.abort(newFiles);
+ } finally {
+ perThread.dwpt.checkAndResetHasAborted();
+ flushControl.doOnAbort(perThread);
+ }
+ } else {
+ flushControl.doOnAbort(perThread);
}
+ } else {
+ assert closed;
}
}
- final synchronized void unlockAllAfterAbortAll() {
+ final synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) {
assert indexWriter.holdsFullFlushLock();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAll");
@@ -334,7 +342,7 @@ final class DocumentsWriter {
private boolean preUpdate() throws IOException {
ensureOpen();
- boolean maybeMerge = false;
+ boolean hasEvents = false;
if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
// Help out flushing any queued DWPTs so we can un-stall:
if (infoStream.isEnabled("DW")) {
@@ -345,7 +353,7 @@ final class DocumentsWriter {
DocumentsWriterPerThread flushingDWPT;
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
// Don't push the delete here since the update could fail!
- maybeMerge |= doFlush(flushingDWPT);
+ hasEvents |= doFlush(flushingDWPT);
}
if (infoStream.isEnabled("DW")) {
@@ -361,28 +369,35 @@ final class DocumentsWriter {
infoStream.message("DW", "continue indexing after helping out flushing DocumentsWriter is healthy");
}
}
- return maybeMerge;
+ return hasEvents;
}
- private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException {
- if (flushControl.doApplyAllDeletes()) {
- applyAllDeletes(deleteQueue);
- }
+ private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEvents) throws IOException {
+ hasEvents |= applyAllDeletes(deleteQueue);
if (flushingDWPT != null) {
- maybeMerge |= doFlush(flushingDWPT);
+ hasEvents |= doFlush(flushingDWPT);
} else {
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
if (nextPendingFlush != null) {
- maybeMerge |= doFlush(nextPendingFlush);
+ hasEvents |= doFlush(nextPendingFlush);
}
}
- return maybeMerge;
+ return hasEvents;
+ }
+
+ private final void ensureInitialized(ThreadState state) {
+ if (state.isActive() && state.dwpt == null) {
+ final FieldInfos.Builder infos = new FieldInfos.Builder(
+ writer.globalFieldNumberMap);
+ state.dwpt = new DocumentsWriterPerThread(writer.newSegmentName(),
+ directory, config, infoStream, deleteQueue, infos);
+ }
}
boolean updateDocuments(final Iterable<? extends IndexDocument> docs, final Analyzer analyzer,
final Term delTerm) throws IOException {
- boolean maybeMerge = preUpdate();
+ boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
@@ -392,13 +407,19 @@ final class DocumentsWriter {
ensureOpen();
assert false: "perThread is not active but we are still open";
}
-
+ ensureInitialized(perThread);
+ assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
+ final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm);
numDocsInRAM.addAndGet(docCount);
} finally {
if (dwpt.checkAndResetHasAborted()) {
+ if (!dwpt.pendingFilesToDelete().isEmpty()) {
+ putEvent(new DeleteNewFilesEvent(dwpt.pendingFilesToDelete()));
+ }
+ subtractFlushedNumDocs(dwptNumDocs);
flushControl.doOnAbort(perThread);
}
}
@@ -408,31 +429,35 @@ final class DocumentsWriter {
perThread.unlock();
}
- return postUpdate(flushingDWPT, maybeMerge);
+ return postUpdate(flushingDWPT, hasEvents);
}
boolean updateDocument(final IndexDocument doc, final Analyzer analyzer,
final Term delTerm) throws IOException {
- boolean maybeMerge = preUpdate();
+ boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
-
try {
-
if (!perThread.isActive()) {
ensureOpen();
- throw new IllegalStateException("perThread is not active but we are still open");
+ assert false: "perThread is not active but we are still open";
}
-
+ ensureInitialized(perThread);
+ assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
+ final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
dwpt.updateDocument(doc, analyzer, delTerm);
numDocsInRAM.incrementAndGet();
} finally {
if (dwpt.checkAndResetHasAborted()) {
+ if (!dwpt.pendingFilesToDelete().isEmpty()) {
+ putEvent(new DeleteNewFilesEvent(dwpt.pendingFilesToDelete()));
+ }
+ subtractFlushedNumDocs(dwptNumDocs);
flushControl.doOnAbort(perThread);
}
}
@@ -442,13 +467,13 @@ final class DocumentsWriter {
perThread.unlock();
}
- return postUpdate(flushingDWPT, maybeMerge);
+ return postUpdate(flushingDWPT, hasEvents);
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
- boolean maybeMerge = false;
+ boolean hasEvents = false;
while (flushingDWPT != null) {
- maybeMerge = true;
+ hasEvents = true;
boolean success = false;
SegmentFlushTicket ticket = null;
try {
@@ -474,9 +499,24 @@ final class DocumentsWriter {
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = ticketQueue.addFlushTicket(flushingDWPT);
- // flush concurrently without locking
- final FlushedSegment newSegment = flushingDWPT.flush();
- ticketQueue.addSegment(ticket, newSegment);
+ final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
+ boolean dwptSuccess = false;
+ try {
+ // flush concurrently without locking
+ final FlushedSegment newSegment = flushingDWPT.flush();
+ ticketQueue.addSegment(ticket, newSegment);
+ dwptSuccess = true;
+ } finally {
+ subtractFlushedNumDocs(flushingDocsInRam);
+ if (!flushingDWPT.pendingFilesToDelete().isEmpty()) {
+ putEvent(new DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete()));
+ hasEvents = true;
+ }
+ if (!dwptSuccess) {
+ putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo()));
+ hasEvents = true;
+ }
+ }
// flush was successful once we reached this point - new seg. has been assigned to the ticket!
success = true;
} finally {
@@ -496,54 +536,38 @@ final class DocumentsWriter {
// thread in innerPurge can't keep up with all
// other threads flushing segments. In this case
// we forcefully stall the producers.
- ticketQueue.forcePurge(this);
- } else {
- ticketQueue.tryPurge(this);
+ putEvent(ForcedPurgeEvent.INSTANCE);
+ break;
}
-
} finally {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
- indexWriter.flushCount.incrementAndGet();
- indexWriter.doAfterFlush();
}
flushingDWPT = flushControl.nextPendingFlush();
}
-
+ if (hasEvents) {
+ putEvent(MergePendingEvent.INSTANCE);
+ }
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
// tiny segments:
- final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
+ final double ramBufferSizeMB = config.getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "force apply deletes bytesUsed=" + flushControl.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*ramBufferSizeMB));
}
- applyAllDeletes(deleteQueue);
+ hasEvents = true;
+ if (!this.applyAllDeletes(deleteQueue)) {
+ putEvent(ApplyDeletesEvent.INSTANCE);
+ }
}
- return maybeMerge;
+ return hasEvents;
}
-
- void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
- throws IOException {
- // Finish the flushed segment and publish it to IndexWriter
- if (newSegment == null) {
- assert bufferedDeletes != null;
- if (bufferedDeletes != null && bufferedDeletes.any()) {
- indexWriter.publishFrozenDeletes(bufferedDeletes);
- if (infoStream.isEnabled("DW")) {
- infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes);
- }
- }
- } else {
- publishFlushedSegment(newSegment, bufferedDeletes);
- }
- }
-
final void subtractFlushedNumDocs(int numFlushed) {
int oldValue = numDocsInRAM.get();
while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
@@ -551,29 +575,6 @@ final class DocumentsWriter {
}
}
- /**
- * Publishes the flushed segment, segment private deletes (if any) and its
- * associated global delete (if present) to IndexWriter. The actual
- * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
- * delete generation is always GlobalPacket_deleteGeneration + 1
- */
- private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
- throws IOException {
- assert newSegment != null;
- assert newSegment.segmentInfo != null;
- final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes;
- //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
- if (infoStream.isEnabled("DW")) {
- infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes);
- }
-
- if (segmentDeletes != null && infoStream.isEnabled("DW")) {
- infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes);
- }
- // now publish!
- indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket);
- }
-
// for asserts
private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
@@ -588,7 +589,7 @@ final class DocumentsWriter {
* two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl
*/
- final boolean flushAllThreads()
+ final boolean flushAllThreads(final IndexWriter indexWriter)
throws IOException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) {
@@ -620,10 +621,9 @@ final class DocumentsWriter {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
- ticketQueue.addDeletesAndPurge(this, flushingDeleteQueue);
- } else {
- ticketQueue.forcePurge(this);
- }
+ ticketQueue.addDeletes(flushingDeleteQueue);
+ }
+ ticketQueue.forcePurge(indexWriter);
assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
} finally {
assert flushingDeleteQueue == currentFullFlushDelQueue;
@@ -641,11 +641,94 @@ final class DocumentsWriter {
// Release the flush lock
flushControl.finishFullFlush();
} else {
- flushControl.abortFullFlushes();
+ Set<String> newFilesSet = new HashSet<>();
+ flushControl.abortFullFlushes(newFilesSet);
+ putEvent(new DeleteNewFilesEvent(newFilesSet));
+
}
} finally {
pendingChangesInCurrentFullFlush = false;
}
}
+
+ public LiveIndexWriterConfig getIndexWriterConfig() {
+ return config;
+ }
+
+ private void putEvent(Event event) {
+ events.add(event);
+ }
+
+ static final class ApplyDeletesEvent implements Event {
+ static final Event INSTANCE = new ApplyDeletesEvent();
+ private int instCount = 0;
+ private ApplyDeletesEvent() {
+ assert instCount == 0;
+ instCount++;
+ }
+
+ @Override
+ public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ writer.applyDeletesAndPurge(true); // we always purge!
+ }
+ }
+
+ static final class MergePendingEvent implements Event {
+ static final Event INSTANCE = new MergePendingEvent();
+ private int instCount = 0;
+ private MergePendingEvent() {
+ assert instCount == 0;
+ instCount++;
+ }
+
+ @Override
+ public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ writer.doAfterSegmentFlushed(triggerMerge, forcePurge);
+ }
+ }
+
+ static final class ForcedPurgeEvent implements Event {
+ static final Event INSTANCE = new ForcedPurgeEvent();
+ private int instCount = 0;
+ private ForcedPurgeEvent() {
+ assert instCount == 0;
+ instCount++;
+ }
+
+ @Override
+ public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ writer.purge(true);
+ }
+ }
+
+ static class FlushFailedEvent implements Event {
+ private final SegmentInfo info;
+
+ public FlushFailedEvent(SegmentInfo info) {
+ this.info = info;
+ }
+
+ @Override
+ public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ writer.flushFailed(info);
+ }
+ }
+
+ static class DeleteNewFilesEvent implements Event {
+ private final Collection<String> files;
+
+ public DeleteNewFilesEvent(Collection<String> files) {
+ this.files = files;
+ }
+
+ @Override
+ public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ writer.deleteNewFiles(files);
+ }
+ }
+
+ public Queue<Event> eventQueue() {
+ return events;
+ }
}
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java Mon Oct 21 18:58:24 2013
@@ -107,6 +107,11 @@ final class DocumentsWriterDeleteQueue {
tryApplyGlobalSlice();
}
+ void addNumericUpdate(NumericUpdate update) {
+ add(new NumericUpdateNode(update));
+ tryApplyGlobalSlice();
+ }
+
/**
* invariant for document update
*/
@@ -380,6 +385,22 @@ final class DocumentsWriterDeleteQueue {
}
}
+ private static final class NumericUpdateNode extends Node<NumericUpdate> {
+
+ NumericUpdateNode(NumericUpdate update) {
+ super(update);
+ }
+
+ @Override
+ void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
+ bufferedDeletes.addNumericUpdate(item, docIDUpto);
+ }
+
+ @Override
+ public String toString() {
+ return "update=" + item;
+ }
+ }
private boolean forceApplyGlobalSlice() {
globalBufferLock.lock();
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Mon Oct 21 18:58:24 2013
@@ -23,9 +23,11 @@ import java.util.List;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.ThreadInterruptedException;
/**
@@ -66,14 +68,18 @@ final class DocumentsWriterFlushControl
private boolean closed = false;
private final DocumentsWriter documentsWriter;
private final LiveIndexWriterConfig config;
+ private final BufferedDeletesStream bufferedDeletesStream;
+ private final InfoStream infoStream;
- DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) {
+ DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config, BufferedDeletesStream bufferedDeletesStream) {
+ this.infoStream = config.getInfoStream();
this.stallControl = new DocumentsWriterStallControl();
this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = documentsWriter.flushPolicy;
- this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
this.config = config;
+ this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
this.documentsWriter = documentsWriter;
+ this.bufferedDeletesStream = bufferedDeletesStream;
}
public synchronized long activeBytes() {
@@ -240,7 +246,6 @@ final class DocumentsWriterFlushControl
}
public synchronized void waitForFlush() {
- assert !Thread.holdsLock(this.documentsWriter.indexWriter) : "IW lock should never be hold when waiting on flush";
while (flushingWriters.size() != 0) {
try {
this.wait();
@@ -277,7 +282,7 @@ final class DocumentsWriterFlushControl
}
assert assertMemory();
// Take it out of the loop this DWPT is stale
- perThreadPool.replaceForFlush(state, closed);
+ perThreadPool.reset(state, closed);
} finally {
updateStallState();
}
@@ -295,7 +300,7 @@ final class DocumentsWriterFlushControl
assert fullFlush : "can not block if fullFlush == false";
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed;
- dwpt = perThreadPool.replaceForFlush(perThread, closed);
+ dwpt = perThreadPool.reset(perThread, closed);
numPending--;
blockedFlushes.add(new BlockedFlush(dwpt, bytes));
}finally {
@@ -311,12 +316,12 @@ final class DocumentsWriterFlushControl
// We are pending so all memory is already moved to flushBytes
if (perThread.tryLock()) {
try {
- if (perThread.isActive()) {
+ if (perThread.isInitialized()) {
assert perThread.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed; // do that before
// replace!
- dwpt = perThreadPool.replaceForFlush(perThread, closed);
+ dwpt = perThreadPool.reset(perThread, closed);
assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
// Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.put(dwpt, Long.valueOf(bytes));
@@ -413,18 +418,18 @@ final class DocumentsWriterFlushControl
* Returns the number of delete terms in the global pool
*/
public int getNumGlobalTermDeletes() {
- return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms();
+ return documentsWriter.deleteQueue.numGlobalTermDeletes() + bufferedDeletesStream.numTerms();
}
public long getDeleteBytesUsed() {
- return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed();
+ return documentsWriter.deleteQueue.bytesUsed() + bufferedDeletesStream.bytesUsed();
}
synchronized int numFlushingDWPT() {
return flushingWriters.size();
}
- public boolean doApplyAllDeletes() {
+ public boolean getAndResetApplyAllDeletes() {
return flushDeletes.getAndSet(false);
}
@@ -441,7 +446,7 @@ final class DocumentsWriterFlushControl
.currentThread(), documentsWriter);
boolean success = false;
try {
- if (perThread.isActive()
+ if (perThread.isInitialized()
&& perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
@@ -475,7 +480,10 @@ final class DocumentsWriterFlushControl
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
- if (!next.isActive()) {
+ if (!next.isInitialized()) {
+ if (closed && next.isActive()) {
+ perThreadPool.deactivateThreadState(next);
+ }
continue;
}
assert next.dwpt.deleteQueue == flushingQueue
@@ -515,7 +523,7 @@ final class DocumentsWriterFlushControl
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
- assert !next.isActive() || next.dwpt.deleteQueue == queue;
+ assert !next.isInitialized() || next.dwpt.deleteQueue == queue : "isInitialized: " + next.isInitialized() + " numDocs: " + (next.isInitialized() ? next.dwpt.getNumDocsInRAM() : 0) ;
} finally {
next.unlock();
}
@@ -526,12 +534,12 @@ final class DocumentsWriterFlushControl
private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<DocumentsWriterPerThread>();
void addFlushableState(ThreadState perThread) {
- if (documentsWriter.infoStream.isEnabled("DWFC")) {
- documentsWriter.infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
+ if (infoStream.isEnabled("DWFC")) {
+ infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
}
final DocumentsWriterPerThread dwpt = perThread.dwpt;
assert perThread.isHeldByCurrentThread();
- assert perThread.isActive();
+ assert perThread.isInitialized();
assert fullFlush;
assert dwpt.deleteQueue != documentsWriter.deleteQueue;
if (dwpt.getNumDocsInRAM() > 0) {
@@ -545,11 +553,7 @@ final class DocumentsWriterFlushControl
fullFlushBuffer.add(flushingDWPT);
}
} else {
- if (closed) {
- perThreadPool.deactivateThreadState(perThread); // make this state inactive
- } else {
- perThreadPool.reinitThreadState(perThread);
- }
+ perThreadPool.reset(perThread, closed); // make this state inactive
}
}
@@ -594,19 +598,20 @@ final class DocumentsWriterFlushControl
return true;
}
- synchronized void abortFullFlushes() {
+ synchronized void abortFullFlushes(Set<String> newFiles) {
try {
- abortPendingFlushes();
+ abortPendingFlushes(newFiles);
} finally {
fullFlush = false;
}
}
- synchronized void abortPendingFlushes() {
+ synchronized void abortPendingFlushes(Set<String> newFiles) {
try {
for (DocumentsWriterPerThread dwpt : flushQueue) {
try {
- dwpt.abort();
+ documentsWriter.subtractFlushedNumDocs(dwpt.getNumDocsInRAM());
+ dwpt.abort(newFiles);
} catch (Throwable ex) {
// ignore - keep on aborting the flush queue
} finally {
@@ -617,7 +622,8 @@ final class DocumentsWriterFlushControl
try {
flushingWriters
.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
- blockedFlush.dwpt.abort();
+ documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM());
+ blockedFlush.dwpt.abort(newFiles);
} catch (Throwable ex) {
// ignore - keep on aborting the blocked queue
} finally {
@@ -670,8 +676,8 @@ final class DocumentsWriterFlushControl
* checked out DWPT are available
*/
void waitIfStalled() {
- if (documentsWriter.infoStream.isEnabled("DWFC")) {
- documentsWriter.infoStream.message("DWFC",
+ if (infoStream.isEnabled("DWFC")) {
+ infoStream.message("DWFC",
"waitIfStalled: numFlushesPending: " + flushQueue.size()
+ " netBytes: " + netBytes() + " flushBytes: " + flushBytes()
+ " fullFlush: " + fullFlush);
@@ -686,5 +692,12 @@ final class DocumentsWriterFlushControl
return stallControl.anyStalledThreads();
}
+ /**
+ * Returns the {@link IndexWriter} {@link InfoStream}
+ */
+ public InfoStream getInfoStream() {
+ return infoStream;
+ }
+
}
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java Mon Oct 21 18:58:24 2013
@@ -34,8 +34,7 @@ class DocumentsWriterFlushQueue {
private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock();
- void addDeletesAndPurge(DocumentsWriter writer,
- DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+ void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
synchronized (this) {
incTickets();// first inc the ticket count - freeze opens
// a window for #anyChanges to fail
@@ -49,9 +48,6 @@ class DocumentsWriterFlushQueue {
}
}
}
- // don't hold the lock on the FlushQueue when forcing the purge - this blocks and deadlocks
- // if we hold the lock.
- forcePurge(writer);
}
private void incTickets() {
@@ -98,8 +94,9 @@ class DocumentsWriterFlushQueue {
return ticketCount.get() != 0;
}
- private void innerPurge(DocumentsWriter writer) throws IOException {
+ private int innerPurge(IndexWriter writer) throws IOException {
assert purgeLock.isHeldByCurrentThread();
+ int numPurged = 0;
while (true) {
final FlushTicket head;
final boolean canPublish;
@@ -108,6 +105,7 @@ class DocumentsWriterFlushQueue {
canPublish = head != null && head.canPublish(); // do this synced
}
if (canPublish) {
+ numPurged++;
try {
/*
* if we block on publish -> lock IW -> lock BufferedDeletes we don't block
@@ -116,6 +114,7 @@ class DocumentsWriterFlushQueue {
* be a ticket still in the queue.
*/
head.publish(writer);
+
} finally {
synchronized (this) {
// finally remove the published ticket from the queue
@@ -128,27 +127,31 @@ class DocumentsWriterFlushQueue {
break;
}
}
+ return numPurged;
}
- void forcePurge(DocumentsWriter writer) throws IOException {
+ int forcePurge(IndexWriter writer) throws IOException {
assert !Thread.holdsLock(this);
+ assert !Thread.holdsLock(writer);
purgeLock.lock();
try {
- innerPurge(writer);
+ return innerPurge(writer);
} finally {
purgeLock.unlock();
}
}
- void tryPurge(DocumentsWriter writer) throws IOException {
+ int tryPurge(IndexWriter writer) throws IOException {
assert !Thread.holdsLock(this);
+ assert !Thread.holdsLock(writer);
if (purgeLock.tryLock()) {
try {
- innerPurge(writer);
+ return innerPurge(writer);
} finally {
purgeLock.unlock();
}
}
+ return 0;
}
public int getTicketCount() {
@@ -169,8 +172,47 @@ class DocumentsWriterFlushQueue {
this.frozenDeletes = frozenDeletes;
}
- protected abstract void publish(DocumentsWriter writer) throws IOException;
+ protected abstract void publish(IndexWriter writer) throws IOException;
protected abstract boolean canPublish();
+
+ /**
+ * Publishes the flushed segment, segment private deletes (if any) and its
+ * associated global delete (if present) to IndexWriter. The actual
+ * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
+ * delete generation is always GlobalPacket_deleteGeneration + 1
+ */
+ protected final void publishFlushedSegment(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
+ throws IOException {
+ assert newSegment != null;
+ assert newSegment.segmentInfo != null;
+ final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes;
+ //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
+ if (indexWriter.infoStream.isEnabled("DW")) {
+ indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes);
+ }
+
+ if (segmentDeletes != null && indexWriter.infoStream.isEnabled("DW")) {
+ indexWriter.infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes);
+ }
+ // now publish!
+ indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket);
+ }
+
+ protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
+ throws IOException {
+ // Finish the flushed segment and publish it to IndexWriter
+ if (newSegment == null) {
+ assert bufferedDeletes != null;
+ if (bufferedDeletes != null && bufferedDeletes.any()) {
+ indexWriter.publishFrozenDeletes(bufferedDeletes);
+ if (indexWriter.infoStream.isEnabled("DW")) {
+ indexWriter.infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes);
+ }
+ }
+ } else {
+ publishFlushedSegment(indexWriter, newSegment, bufferedDeletes);
+ }
+ }
}
static final class GlobalDeletesTicket extends FlushTicket {
@@ -179,11 +221,11 @@ class DocumentsWriterFlushQueue {
super(frozenDeletes);
}
@Override
- protected void publish(DocumentsWriter writer) throws IOException {
+ protected void publish(IndexWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
published = true;
// its a global ticket - no segment to publish
- writer.finishFlush(null, frozenDeletes);
+ finishFlush(writer, null, frozenDeletes);
}
@Override
@@ -201,10 +243,10 @@ class DocumentsWriterFlushQueue {
}
@Override
- protected void publish(DocumentsWriter writer) throws IOException {
+ protected void publish(IndexWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
published = true;
- writer.finishFlush(segment, frozenDeletes);
+ finishFlush(writer, segment, frozenDeletes);
}
protected void setSegment(FlushedSegment segment) {
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Mon Oct 21 18:58:24 2013
@@ -17,11 +17,14 @@ package org.apache.lucene.index;
* limitations under the License.
*/
+import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
+import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
+
import java.io.IOException;
import java.text.NumberFormat;
-import java.util.Collection;
import java.util.HashSet;
import java.util.Locale;
+import java.util.Set;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
@@ -40,9 +43,6 @@ import org.apache.lucene.util.IntBlockPo
import org.apache.lucene.util.MutableBits;
import org.apache.lucene.util.RamUsageEstimator;
-import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
-import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
-
class DocumentsWriterPerThread {
/**
@@ -144,7 +144,7 @@ class DocumentsWriterPerThread {
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
- void abort() {
+ void abort(Set<String> createdFiles) {
//System.out.println(Thread.currentThread().getName() + ": now abort seg=" + segmentInfo.name);
hasAborted = aborting = true;
try {
@@ -157,10 +157,7 @@ class DocumentsWriterPerThread {
}
pendingDeletes.clear();
- deleteSlice = deleteQueue.newSlice();
- // Reset all postings data
- doAfterFlush();
-
+ createdFiles.addAll(directory.getCreatedFiles());
} finally {
aborting = false;
if (infoStream.isEnabled("DWPT")) {
@@ -169,9 +166,7 @@ class DocumentsWriterPerThread {
}
}
private final static boolean INFO_VERBOSE = false;
- final DocumentsWriter parent;
final Codec codec;
- final IndexWriter writer;
final TrackingDirectoryWrapper directory;
final Directory directoryOrig;
final DocState docState;
@@ -179,60 +174,62 @@ class DocumentsWriterPerThread {
final Counter bytesUsed;
SegmentWriteState flushState;
- //Deletes for our still-in-RAM (to be flushed next) segment
- BufferedDeletes pendingDeletes;
- SegmentInfo segmentInfo; // Current segment we are working on
+ // Deletes for our still-in-RAM (to be flushed next) segment
+ final BufferedDeletes pendingDeletes;
+ private final SegmentInfo segmentInfo; // Current segment we are working on
boolean aborting = false; // True if an abort is pending
boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting
private FieldInfos.Builder fieldInfos;
private final InfoStream infoStream;
private int numDocsInRAM;
- private int flushedDocCount;
- DocumentsWriterDeleteQueue deleteQueue;
- DeleteSlice deleteSlice;
+ final DocumentsWriterDeleteQueue deleteQueue;
+ private final DeleteSlice deleteSlice;
private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT);
final Allocator byteBlockAllocator;
final IntBlockPool.Allocator intBlockAllocator;
private final LiveIndexWriterConfig indexWriterConfig;
- public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
- FieldInfos.Builder fieldInfos, IndexingChain indexingChain) {
+ public DocumentsWriterPerThread(String segmentName, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
+ FieldInfos.Builder fieldInfos) {
this.directoryOrig = directory;
this.directory = new TrackingDirectoryWrapper(directory);
- this.parent = parent;
this.fieldInfos = fieldInfos;
- this.writer = parent.indexWriter;
- this.indexWriterConfig = parent.indexWriterConfig;
- this.infoStream = parent.infoStream;
- this.codec = parent.codec;
+ this.indexWriterConfig = indexWriterConfig;
+ this.infoStream = infoStream;
+ this.codec = indexWriterConfig.getCodec();
this.docState = new DocState(this, infoStream);
- this.docState.similarity = parent.indexWriter.getConfig().getSimilarity();
+ this.docState.similarity = indexWriterConfig.getSimilarity();
bytesUsed = Counter.newCounter();
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
pendingDeletes = new BufferedDeletes();
intBlockAllocator = new IntBlockAllocator(bytesUsed);
- initialize();
+ this.deleteQueue = deleteQueue;
+ assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
+ pendingDeletes.clear();
+ deleteSlice = deleteQueue.newSlice();
+
+ segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segmentName, -1, false, codec, null);
+ assert numDocsInRAM == 0;
+ if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
+ infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);
+ }
// this should be the last call in the ctor
// it really sucks that we need to pull this within the ctor and pass this ref to the chain!
- consumer = indexingChain.getChain(this);
- }
-
- public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos.Builder fieldInfos) {
- this(other.directoryOrig, other.parent, fieldInfos, other.parent.chain);
+ consumer = indexWriterConfig.getIndexingChain().getChain(this);
+
}
- void initialize() {
- deleteQueue = parent.deleteQueue;
- assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
- pendingDeletes.clear();
- deleteSlice = null;
- }
-
void setAborting() {
aborting = true;
}
+
+ boolean checkAndResetHasAborted() {
+ final boolean retval = hasAborted;
+ hasAborted = false;
+ return retval;
+ }
final boolean testPoint(String message) {
if (infoStream.isEnabled("TP")) {
@@ -240,12 +237,6 @@ class DocumentsWriterPerThread {
}
return true;
}
-
- boolean checkAndResetHasAborted() {
- final boolean retval = hasAborted;
- hasAborted = false;
- return retval;
- }
public void updateDocument(IndexDocument doc, Analyzer analyzer, Term delTerm) throws IOException {
assert testPoint("DocumentsWriterPerThread addDocument start");
@@ -253,9 +244,6 @@ class DocumentsWriterPerThread {
docState.doc = doc;
docState.analyzer = analyzer;
docState.docID = numDocsInRAM;
- if (segmentInfo == null) {
- initSegmentInfo();
- }
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
@@ -274,7 +262,7 @@ class DocumentsWriterPerThread {
deleteDocID(docState.docID);
numDocsInRAM++;
} else {
- abort();
+ abort(filesToDelete);
}
}
}
@@ -284,29 +272,16 @@ class DocumentsWriterPerThread {
success = true;
} finally {
if (!success) {
- abort();
+ abort(filesToDelete);
}
}
finishDocument(delTerm);
}
- private void initSegmentInfo() {
- String segment = writer.newSegmentName();
- segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segment, -1,
- false, codec, null, null);
- assert numDocsInRAM == 0;
- if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
- infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);
- }
- }
-
public int updateDocuments(Iterable<? extends IndexDocument> docs, Analyzer analyzer, Term delTerm) throws IOException {
assert testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null;
docState.analyzer = analyzer;
- if (segmentInfo == null) {
- initSegmentInfo();
- }
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
@@ -331,7 +306,7 @@ class DocumentsWriterPerThread {
// be called (because an exc is being thrown):
numDocsInRAM++;
} else {
- abort();
+ abort(filesToDelete);
}
}
}
@@ -341,7 +316,7 @@ class DocumentsWriterPerThread {
success = true;
} finally {
if (!success) {
- abort();
+ abort(filesToDelete);
}
}
@@ -384,21 +359,18 @@ class DocumentsWriterPerThread {
* the updated slice we get from 1. holds all the deletes that have occurred
* since we updated the slice the last time.
*/
- if (deleteSlice == null) {
- deleteSlice = deleteQueue.newSlice();
- if (delTerm != null) {
- deleteQueue.add(delTerm, deleteSlice);
- deleteSlice.reset();
- }
-
- } else {
- if (delTerm != null) {
- deleteQueue.add(delTerm, deleteSlice);
- assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
- deleteSlice.apply(pendingDeletes, numDocsInRAM);
- } else if (deleteQueue.updateSlice(deleteSlice)) {
- deleteSlice.apply(pendingDeletes, numDocsInRAM);
- }
+ boolean applySlice = numDocsInRAM != 0;
+ if (delTerm != null) {
+ deleteQueue.add(delTerm, deleteSlice);
+ assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
+ } else {
+ applySlice &= deleteQueue.updateSlice(deleteSlice);
+ }
+
+ if (applySlice) {
+ deleteSlice.apply(pendingDeletes, numDocsInRAM);
+ } else { // if we don't need to apply we must reset!
+ deleteSlice.reset();
}
++numDocsInRAM;
}
@@ -434,14 +406,6 @@ class DocumentsWriterPerThread {
return numDocsInRAM;
}
- /** Reset after a flush */
- private void doAfterFlush() {
- segmentInfo = null;
- directory.getCreatedFiles().clear();
- fieldInfos = new FieldInfos.Builder(fieldInfos.globalFieldNumbers);
- parent.subtractFlushedNumDocs(numDocsInRAM);
- numDocsInRAM = 0;
- }
/**
* Prepares this DWPT for flushing. This method will freeze and return the
@@ -457,7 +421,7 @@ class DocumentsWriterPerThread {
// apply all deletes before we flush and release the delete slice
deleteSlice.apply(pendingDeletes, numDocsInRAM);
assert deleteSlice.isEmpty();
- deleteSlice = null;
+ deleteSlice.reset();
}
return globalDeletes;
}
@@ -465,11 +429,11 @@ class DocumentsWriterPerThread {
/** Flush all pending docs to a new segment */
FlushedSegment flush() throws IOException {
assert numDocsInRAM > 0;
- assert deleteSlice == null : "all deletes must be applied in prepareFlush";
+ assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
segmentInfo.setDocCount(numDocsInRAM);
- flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
+ final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
pendingDeletes, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
- final double startMBUsed = parent.flushControl.netBytes() / 1024. / 1024.;
+ final double startMBUsed = bytesUsed() / 1024. / 1024.;
// Apply delete-by-docID now (delete-byDocID only
// happens when an exception is hit processing that
@@ -502,7 +466,7 @@ class DocumentsWriterPerThread {
pendingDeletes.terms.clear();
segmentInfo.setFiles(new HashSet<String>(directory.getCreatedFiles()));
- final SegmentInfoPerCommit segmentInfoPerCommit = new SegmentInfoPerCommit(segmentInfo, 0, -1L);
+ final SegmentInfoPerCommit segmentInfoPerCommit = new SegmentInfoPerCommit(segmentInfo, 0, -1L, -1L);
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "new segment has " + (flushState.liveDocs == null ? 0 : (flushState.segmentInfo.getDocCount() - flushState.delCountOnFlush)) + " deleted docs");
infoStream.message("DWPT", "new segment has " +
@@ -515,15 +479,12 @@ class DocumentsWriterPerThread {
infoStream.message("DWPT", "flushed codec=" + codec);
}
- flushedDocCount += flushState.segmentInfo.getDocCount();
-
final BufferedDeletes segmentDeletes;
- if (pendingDeletes.queries.isEmpty()) {
+ if (pendingDeletes.queries.isEmpty() && pendingDeletes.numericUpdates.isEmpty()) {
pendingDeletes.clear();
segmentDeletes = null;
} else {
segmentDeletes = pendingDeletes;
- pendingDeletes = new BufferedDeletes();
}
if (infoStream.isEnabled("DWPT")) {
@@ -531,7 +492,7 @@ class DocumentsWriterPerThread {
infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name +
" ramUsed=" + nf.format(startMBUsed) + " MB" +
" newFlushedSize(includes docstores)=" + nf.format(newSegmentSize) + " MB" +
- " docs/MB=" + nf.format(flushedDocCount / newSegmentSize));
+ " docs/MB=" + nf.format(flushState.segmentInfo.getDocCount() / newSegmentSize));
}
assert segmentInfo != null;
@@ -539,20 +500,21 @@ class DocumentsWriterPerThread {
FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos,
segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
sealFlushedSegment(fs);
- doAfterFlush();
success = true;
return fs;
} finally {
if (!success) {
- if (segmentInfo != null) {
- writer.flushFailed(segmentInfo);
- }
- abort();
+ abort(filesToDelete);
}
}
}
+ private final Set<String> filesToDelete = new HashSet<String>();
+
+ public Set<String> pendingFilesToDelete() {
+ return filesToDelete;
+ }
/**
* Seals the {@link SegmentInfo} for the new flushed segment and persists
* the deleted documents {@link MutableBits}.
@@ -568,12 +530,10 @@ class DocumentsWriterPerThread {
boolean success = false;
try {
+
if (indexWriterConfig.getUseCompoundFile()) {
-
- // Now build compound file
- Collection<String> oldFiles = IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context);
+ filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context));
newSegment.info.setUseCompoundFile(true);
- writer.deleteNewFiles(oldFiles);
}
// Have codec write SegmentInfo. Must do this after
@@ -615,10 +575,9 @@ class DocumentsWriterPerThread {
} finally {
if (!success) {
if (infoStream.isEnabled("DWPT")) {
- infoStream.message("DWPT", "hit exception " +
- "reating compound file for newly flushed segment " + newSegment.info.name);
+ infoStream.message("DWPT",
+ "hit exception creating compound file for newly flushed segment " + newSegment.info.name);
}
- writer.flushFailed(newSegment.info);
}
}
}
@@ -671,4 +630,5 @@ class DocumentsWriterPerThread {
+ ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborting=" + aborting + ", numDocsInRAM="
+ numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
}
+
}
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Mon Oct 21 18:58:24 2013
@@ -71,12 +71,16 @@ abstract class DocumentsWriterPerThreadP
* for indexing anymore.
* @see #isActive()
*/
- private void resetWriter(DocumentsWriterPerThread dwpt) {
+
+ private void deactivate() {
assert this.isHeldByCurrentThread();
- if (dwpt == null) {
- isActive = false;
- }
- this.dwpt = dwpt;
+ isActive = false;
+ reset();
+ }
+
+ private void reset() {
+ assert this.isHeldByCurrentThread();
+ this.dwpt = null;
this.bytesUsed = 0;
this.flushPending = false;
}
@@ -91,6 +95,11 @@ abstract class DocumentsWriterPerThreadP
return isActive;
}
+ boolean isInitialized() {
+ assert this.isHeldByCurrentThread();
+ return isActive() && dwpt != null;
+ }
+
/**
* Returns the number of currently active bytes in this ThreadState's
* {@link DocumentsWriterPerThread}
@@ -121,9 +130,7 @@ abstract class DocumentsWriterPerThreadP
private ThreadState[] threadStates;
private volatile int numThreadStatesActive;
- private SetOnce<FieldNumbers> globalFieldMap = new SetOnce<FieldNumbers>();
- private SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
-
+
/**
* Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
*/
@@ -133,14 +140,8 @@ abstract class DocumentsWriterPerThreadP
}
threadStates = new ThreadState[maxNumThreadStates];
numThreadStatesActive = 0;
- }
-
- void initialize(DocumentsWriter documentsWriter, FieldNumbers globalFieldMap, LiveIndexWriterConfig config) {
- this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
- this.globalFieldMap.set(globalFieldMap);
for (int i = 0; i < threadStates.length; i++) {
- final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap);
- threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
+ threadStates[i] = new ThreadState(null);
}
}
@@ -158,9 +159,10 @@ abstract class DocumentsWriterPerThreadP
// should not happen
throw new RuntimeException(e);
}
- clone.documentsWriter = new SetOnce<DocumentsWriter>();
- clone.globalFieldMap = new SetOnce<FieldNumbers>();
clone.threadStates = new ThreadState[threadStates.length];
+ for (int i = 0; i < threadStates.length; i++) {
+ clone.threadStates[i] = new ThreadState(null);
+ }
return clone;
}
@@ -178,6 +180,7 @@ abstract class DocumentsWriterPerThreadP
int getActiveThreadState() {
return numThreadStatesActive;
}
+
/**
* Returns a new {@link ThreadState} iff any new state is available otherwise
@@ -198,8 +201,7 @@ abstract class DocumentsWriterPerThreadP
if (threadState.isActive()) {
// unreleased thread states are deactivated during DW#close()
numThreadStatesActive++; // increment will publish the ThreadState
- assert threadState.dwpt != null;
- threadState.dwpt.initialize();
+ assert threadState.dwpt == null;
unlock = false;
return threadState;
}
@@ -220,7 +222,7 @@ abstract class DocumentsWriterPerThreadP
for (int i = numThreadStatesActive; i < threadStates.length; i++) {
assert threadStates[i].tryLock() : "unreleased threadstate should not be locked";
try {
- assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive";
+ assert !threadStates[i].isInitialized() : "expected unreleased thread state to be inactive";
} finally {
threadStates[i].unlock();
}
@@ -236,24 +238,20 @@ abstract class DocumentsWriterPerThreadP
final ThreadState threadState = threadStates[i];
threadState.lock();
try {
- threadState.resetWriter(null);
+ threadState.deactivate();
} finally {
threadState.unlock();
}
}
}
- DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
+ DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) {
assert threadState.isHeldByCurrentThread();
- assert globalFieldMap.get() != null;
final DocumentsWriterPerThread dwpt = threadState.dwpt;
if (!closed) {
- final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap.get());
- final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
- newDwpt.initialize();
- threadState.resetWriter(newDwpt);
+ threadState.reset();
} else {
- threadState.resetWriter(null);
+ threadState.deactivate();
}
return dwpt;
}
@@ -328,18 +326,6 @@ abstract class DocumentsWriterPerThreadP
*/
void deactivateThreadState(ThreadState threadState) {
assert threadState.isActive();
- threadState.resetWriter(null);
- }
-
- /**
- * Reinitialized an active {@link ThreadState}. A {@link ThreadState} should
- * only be reinitialized if it is active without any pending documents.
- *
- * @param threadState the state to reinitialize
- */
- void reinitThreadState(ThreadState threadState) {
- assert threadState.isActive;
- assert threadState.dwpt.getNumDocsInRAM() == 0;
- threadState.dwpt.initialize();
+ threadState.deactivate();
}
}
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java Mon Oct 21 18:58:24 2013
@@ -47,6 +47,8 @@ public final class FieldInfo {
private Map<String,String> attributes;
+ private long dvGen = -1; // the DocValues generation of this field
+
/**
* Controls how much information is stored in the postings lists.
* @lucene.experimental
@@ -79,7 +81,7 @@ public final class FieldInfo {
* Character offsets are encoded alongside the positions.
*/
DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS,
- };
+ }
/**
* DocValues types.
@@ -92,32 +94,34 @@ public final class FieldInfo {
*/
NUMERIC,
/**
- * A per-document byte[].
+ * A per-document byte[]. Values may be larger than
+ * 32766 bytes, but different codecs may enforce their own limits.
*/
BINARY,
/**
* A pre-sorted byte[]. Fields with this type only store distinct byte values
* and store an additional offset pointer per document to dereference the shared
* byte[]. The stored byte[] is presorted and allows access via document id,
- * ordinal and by-value.
+ * ordinal and by-value. Values must be <= 32766 bytes.
*/
SORTED,
/**
* A pre-sorted Set<byte[]>. Fields with this type only store distinct byte values
* and store additional offset pointers per document to dereference the shared
* byte[]s. The stored byte[] is presorted and allows access via document id,
- * ordinal and by-value.
+ * ordinal and by-value. Values must be <= 32766 bytes.
*/
SORTED_SET
- };
+ }
/**
* Sole Constructor.
*
* @lucene.experimental
*/
- public FieldInfo(String name, boolean indexed, int number, boolean storeTermVector,
- boolean omitNorms, boolean storePayloads, IndexOptions indexOptions, DocValuesType docValues, DocValuesType normsType, Map<String,String> attributes) {
+ public FieldInfo(String name, boolean indexed, int number, boolean storeTermVector, boolean omitNorms,
+ boolean storePayloads, IndexOptions indexOptions, DocValuesType docValues, DocValuesType normsType,
+ Map<String,String> attributes) {
this.name = name;
this.indexed = indexed;
this.number = number;
@@ -222,6 +226,19 @@ public final class FieldInfo {
return docValueType;
}
+ /** Sets the docValues generation of this field. */
+ public void setDocValuesGen(long dvGen) {
+ this.dvGen = dvGen;
+ }
+
+ /**
+ * Returns the docValues generation of this field, or -1 if no docValues
+ * updates exist for it.
+ */
+ public long getDocValuesGen() {
+ return dvGen;
+ }
+
/**
* Returns {@link DocValuesType} of the norm. this may be null if the field has no norms.
*/