You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2011/05/25 20:17:54 UTC
svn commit: r1127614 -
/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
Author: shaie
Date: Wed May 25 18:17:54 2011
New Revision: 1127614
URL: http://svn.apache.org/viewvc?rev=1127614&view=rev
Log:
LUCENE-3144: FreqProxTermsWriter leaks file handles
Modified:
lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java?rev=1127614&r1=1127613&r2=1127614&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java Wed May 25 18:17:54 2011
@@ -94,47 +94,48 @@ final class FreqProxTermsWriter extends
-> FormatPostingsPositionsConsumer
-> IMPL: FormatPostingsPositionsWriter
*/
-
- int start = 0;
- while(start < numAllFields) {
- final FieldInfo fieldInfo = allFields.get(start).fieldInfo;
- final String fieldName = fieldInfo.name;
-
- int end = start+1;
- while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))
- end++;
-
- FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];
- for(int i=start;i<end;i++) {
- fields[i-start] = allFields.get(i);
-
- // Aggregate the storePayload as seen by the same
- // field across multiple threads
- if (!fieldInfo.omitTermFreqAndPositions) {
- fieldInfo.storePayloads |= fields[i-start].hasPayloads;
+ try {
+ int start = 0;
+ while(start < numAllFields) {
+ final FieldInfo fieldInfo = allFields.get(start).fieldInfo;
+ final String fieldName = fieldInfo.name;
+
+ int end = start+1;
+ while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))
+ end++;
+
+ FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];
+ for(int i=start;i<end;i++) {
+ fields[i-start] = allFields.get(i);
+
+ // Aggregate the storePayload as seen by the same
+ // field across multiple threads
+ if (!fieldInfo.omitTermFreqAndPositions) {
+ fieldInfo.storePayloads |= fields[i-start].hasPayloads;
+ }
+ }
+
+ // If this field has postings then add them to the
+ // segment
+ appendPostings(fieldName, state, fields, consumer);
+
+ for(int i=0;i<fields.length;i++) {
+ TermsHashPerField perField = fields[i].termsHashPerField;
+ int numPostings = perField.numPostings;
+ perField.reset();
+ perField.shrinkHash(numPostings);
+ fields[i].reset();
}
+
+ start = end;
}
-
- // If this field has postings then add them to the
- // segment
- appendPostings(fieldName, state, fields, consumer);
-
- for(int i=0;i<fields.length;i++) {
- TermsHashPerField perField = fields[i].termsHashPerField;
- int numPostings = perField.numPostings;
- perField.reset();
- perField.shrinkHash(numPostings);
- fields[i].reset();
+
+ for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
+ FreqProxTermsWriterPerThread perThread = (FreqProxTermsWriterPerThread) entry.getKey();
+ perThread.termsHashPerThread.reset(true);
}
-
- start = end;
+ } finally {
}
-
- for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
- FreqProxTermsWriterPerThread perThread = (FreqProxTermsWriterPerThread) entry.getKey();
- perThread.termsHashPerThread.reset(true);
- }
-
consumer.finish();
}
@@ -176,141 +177,147 @@ final class FreqProxTermsWriter extends
segDeletes = null;
}
- // TODO: really TermsHashPerField should take over most
- // of this loop, including merge sort of terms from
- // multiple threads and interacting with the
- // TermsConsumer, only calling out to us (passing us the
- // DocsConsumer) to handle delivery of docs/positions
- while(numFields > 0) {
-
- // Get the next term to merge
- termStates[0] = mergeStates[0];
- int numToMerge = 1;
-
- // TODO: pqueue
- for(int i=1;i<numFields;i++) {
- final char[] text = mergeStates[i].text;
- final int textOffset = mergeStates[i].textOffset;
- final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);
-
- if (cmp < 0) {
- termStates[0] = mergeStates[i];
- numToMerge = 1;
- } else if (cmp == 0)
- termStates[numToMerge++] = mergeStates[i];
- }
+ try {
+ // TODO: really TermsHashPerField should take over most
+ // of this loop, including merge sort of terms from
+ // multiple threads and interacting with the
+ // TermsConsumer, only calling out to us (passing us the
+ // DocsConsumer) to handle delivery of docs/positions
+ while(numFields > 0) {
+
+ // Get the next term to merge
+ termStates[0] = mergeStates[0];
+ int numToMerge = 1;
+
+ // TODO: pqueue
+ for(int i=1;i<numFields;i++) {
+ final char[] text = mergeStates[i].text;
+ final int textOffset = mergeStates[i].textOffset;
+ final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);
+
+ if (cmp < 0) {
+ termStates[0] = mergeStates[i];
+ numToMerge = 1;
+ } else if (cmp == 0)
+ termStates[numToMerge++] = mergeStates[i];
+ }
- final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset);
+ final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset);
- final int delDocLimit;
- if (segDeletes != null) {
- final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(termStates[0].termText()));
- if (docIDUpto != null) {
- delDocLimit = docIDUpto;
+ final int delDocLimit;
+ if (segDeletes != null) {
+ final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(termStates[0].termText()));
+ if (docIDUpto != null) {
+ delDocLimit = docIDUpto;
+ } else {
+ delDocLimit = 0;
+ }
} else {
delDocLimit = 0;
}
- } else {
- delDocLimit = 0;
- }
-
- // Now termStates has numToMerge FieldMergeStates
- // which all share the same term. Now we must
- // interleave the docID streams.
- while(numToMerge > 0) {
-
- FreqProxFieldMergeState minState = termStates[0];
- for(int i=1;i<numToMerge;i++)
- if (termStates[i].docID < minState.docID)
- minState = termStates[i];
-
- final int termDocFreq = minState.termFreq;
-
- final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq);
-
- // NOTE: we could check here if the docID was
- // deleted, and skip it. However, this is somewhat
- // dangerous because it can yield non-deterministic
- // behavior since we may see the docID before we see
- // the term that caused it to be deleted. This
- // would mean some (but not all) of its postings may
- // make it into the index, which'd alter the docFreq
- // for those terms. We could fix this by doing two
- // passes, ie first sweep marks all del docs, and
- // 2nd sweep does the real flush, but I suspect
- // that'd add too much time to flush.
-
- if (minState.docID < delDocLimit) {
- // Mark it deleted. TODO: we could also skip
- // writing its postings; this would be
- // deterministic (just for this Term's docs).
- if (state.deletedDocs == null) {
- state.deletedDocs = new BitVector(state.numDocs);
- }
- state.deletedDocs.set(minState.docID);
- }
-
- final ByteSliceReader prox = minState.prox;
-
- // Carefully copy over the prox + payload info,
- // changing the format to match Lucene's segment
- // format.
- if (!currentFieldOmitTermFreqAndPositions) {
- // omitTermFreqAndPositions == false so we do write positions &
- // payload
- int position = 0;
- for(int j=0;j<termDocFreq;j++) {
- final int code = prox.readVInt();
- position += code >> 1;
-
- final int payloadLength;
- if ((code & 1) != 0) {
- // This position has a payload
- payloadLength = prox.readVInt();
-
- if (payloadBuffer == null || payloadBuffer.length < payloadLength)
- payloadBuffer = new byte[payloadLength];
-
- prox.readBytes(payloadBuffer, 0, payloadLength);
-
- } else
- payloadLength = 0;
-
- posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
- } //End for
-
- posConsumer.finish();
- }
-
- if (!minState.nextDoc()) {
- // Remove from termStates
- int upto = 0;
- for(int i=0;i<numToMerge;i++)
- if (termStates[i] != minState)
- termStates[upto++] = termStates[i];
- numToMerge--;
- assert upto == numToMerge;
-
- // Advance this state to the next term
-
- if (!minState.nextTerm()) {
- // OK, no more terms, so remove from mergeStates
- // as well
- upto = 0;
- for(int i=0;i<numFields;i++)
- if (mergeStates[i] != minState)
- mergeStates[upto++] = mergeStates[i];
- numFields--;
- assert upto == numFields;
+ try {
+ // Now termStates has numToMerge FieldMergeStates
+ // which all share the same term. Now we must
+ // interleave the docID streams.
+ while(numToMerge > 0) {
+
+ FreqProxFieldMergeState minState = termStates[0];
+ for(int i=1;i<numToMerge;i++)
+ if (termStates[i].docID < minState.docID)
+ minState = termStates[i];
+
+ final int termDocFreq = minState.termFreq;
+
+ final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq);
+
+ // NOTE: we could check here if the docID was
+ // deleted, and skip it. However, this is somewhat
+ // dangerous because it can yield non-deterministic
+ // behavior since we may see the docID before we see
+ // the term that caused it to be deleted. This
+ // would mean some (but not all) of its postings may
+ // make it into the index, which'd alter the docFreq
+ // for those terms. We could fix this by doing two
+ // passes, ie first sweep marks all del docs, and
+ // 2nd sweep does the real flush, but I suspect
+ // that'd add too much time to flush.
+
+ if (minState.docID < delDocLimit) {
+ // Mark it deleted. TODO: we could also skip
+ // writing its postings; this would be
+ // deterministic (just for this Term's docs).
+ if (state.deletedDocs == null) {
+ state.deletedDocs = new BitVector(state.numDocs);
+ }
+ state.deletedDocs.set(minState.docID);
+ }
+
+ final ByteSliceReader prox = minState.prox;
+
+ // Carefully copy over the prox + payload info,
+ // changing the format to match Lucene's segment
+ // format.
+ if (!currentFieldOmitTermFreqAndPositions) {
+ // omitTermFreqAndPositions == false so we do write positions &
+ // payload
+ try {
+ int position = 0;
+ for(int j=0;j<termDocFreq;j++) {
+ final int code = prox.readVInt();
+ position += code >> 1;
+
+ final int payloadLength;
+ if ((code & 1) != 0) {
+ // This position has a payload
+ payloadLength = prox.readVInt();
+
+ if (payloadBuffer == null || payloadBuffer.length < payloadLength)
+ payloadBuffer = new byte[payloadLength];
+
+ prox.readBytes(payloadBuffer, 0, payloadLength);
+
+ } else
+ payloadLength = 0;
+
+ posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
+ } //End for
+ } finally {
+ posConsumer.finish();
+ }
+ }
+
+ if (!minState.nextDoc()) {
+
+ // Remove from termStates
+ int upto = 0;
+ for(int i=0;i<numToMerge;i++)
+ if (termStates[i] != minState)
+ termStates[upto++] = termStates[i];
+ numToMerge--;
+ assert upto == numToMerge;
+
+ // Advance this state to the next term
+
+ if (!minState.nextTerm()) {
+ // OK, no more terms, so remove from mergeStates
+ // as well
+ upto = 0;
+ for(int i=0;i<numFields;i++)
+ if (mergeStates[i] != minState)
+ mergeStates[upto++] = mergeStates[i];
+ numFields--;
+ assert upto == numFields;
+ }
+ }
}
+ } finally {
+ docConsumer.finish();
}
}
-
- docConsumer.finish();
+ } finally {
+ termsConsumer.finish();
}
-
- termsConsumer.finish();
}
final UnicodeUtil.UTF8Result termsUTF8 = new UnicodeUtil.UTF8Result();