You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2011/05/25 20:17:54 UTC

svn commit: r1127614 - /lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java

Author: shaie
Date: Wed May 25 18:17:54 2011
New Revision: 1127614

URL: http://svn.apache.org/viewvc?rev=1127614&view=rev
Log:
LUCENE-3144: FreqProxTermsWriter leaks file handles

Modified:
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java?rev=1127614&r1=1127613&r2=1127614&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java Wed May 25 18:17:54 2011
@@ -94,47 +94,48 @@ final class FreqProxTermsWriter extends 
                   -> FormatPostingsPositionsConsumer
                     -> IMPL: FormatPostingsPositionsWriter
     */
-
-    int start = 0;
-    while(start < numAllFields) {
-      final FieldInfo fieldInfo = allFields.get(start).fieldInfo;
-      final String fieldName = fieldInfo.name;
-
-      int end = start+1;
-      while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))
-        end++;
-      
-      FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];
-      for(int i=start;i<end;i++) {
-        fields[i-start] = allFields.get(i);
-
-        // Aggregate the storePayload as seen by the same
-        // field across multiple threads
-        if (!fieldInfo.omitTermFreqAndPositions) {
-          fieldInfo.storePayloads |= fields[i-start].hasPayloads;
+    try {
+      int start = 0;
+      while(start < numAllFields) {
+        final FieldInfo fieldInfo = allFields.get(start).fieldInfo;
+        final String fieldName = fieldInfo.name;
+        
+        int end = start+1;
+        while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))
+          end++;
+        
+        FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];
+        for(int i=start;i<end;i++) {
+          fields[i-start] = allFields.get(i);
+          
+          // Aggregate the storePayload as seen by the same
+          // field across multiple threads
+          if (!fieldInfo.omitTermFreqAndPositions) {
+            fieldInfo.storePayloads |= fields[i-start].hasPayloads;
+          }
+        }
+        
+        // If this field has postings then add them to the
+        // segment
+        appendPostings(fieldName, state, fields, consumer);
+        
+        for(int i=0;i<fields.length;i++) {
+          TermsHashPerField perField = fields[i].termsHashPerField;
+          int numPostings = perField.numPostings;
+          perField.reset();
+          perField.shrinkHash(numPostings);
+          fields[i].reset();
         }
+        
+        start = end;
       }
-
-      // If this field has postings then add them to the
-      // segment
-      appendPostings(fieldName, state, fields, consumer);
-
-      for(int i=0;i<fields.length;i++) {
-        TermsHashPerField perField = fields[i].termsHashPerField;
-        int numPostings = perField.numPostings;
-        perField.reset();
-        perField.shrinkHash(numPostings);
-        fields[i].reset();
+      
+      for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
+        FreqProxTermsWriterPerThread perThread = (FreqProxTermsWriterPerThread) entry.getKey();
+        perThread.termsHashPerThread.reset(true);
       }
-
-      start = end;
+    } finally {
     }
-
-    for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
-      FreqProxTermsWriterPerThread perThread = (FreqProxTermsWriterPerThread) entry.getKey();
-      perThread.termsHashPerThread.reset(true);
-    }
-
     consumer.finish();
   }
 
@@ -176,141 +177,147 @@ final class FreqProxTermsWriter extends 
       segDeletes = null;
     }
 
-    // TODO: really TermsHashPerField should take over most
-    // of this loop, including merge sort of terms from
-    // multiple threads and interacting with the
-    // TermsConsumer, only calling out to us (passing us the
-    // DocsConsumer) to handle delivery of docs/positions
-    while(numFields > 0) {
-
-      // Get the next term to merge
-      termStates[0] = mergeStates[0];
-      int numToMerge = 1;
-
-      // TODO: pqueue
-      for(int i=1;i<numFields;i++) {
-        final char[] text = mergeStates[i].text;
-        final int textOffset = mergeStates[i].textOffset;
-        final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);
-
-        if (cmp < 0) {
-          termStates[0] = mergeStates[i];
-          numToMerge = 1;
-        } else if (cmp == 0)
-          termStates[numToMerge++] = mergeStates[i];
-      }
+    try {
+      // TODO: really TermsHashPerField should take over most
+      // of this loop, including merge sort of terms from
+      // multiple threads and interacting with the
+      // TermsConsumer, only calling out to us (passing us the
+      // DocsConsumer) to handle delivery of docs/positions
+      while(numFields > 0) {
+
+        // Get the next term to merge
+        termStates[0] = mergeStates[0];
+        int numToMerge = 1;
+
+        // TODO: pqueue
+        for(int i=1;i<numFields;i++) {
+          final char[] text = mergeStates[i].text;
+          final int textOffset = mergeStates[i].textOffset;
+          final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);
+
+          if (cmp < 0) {
+            termStates[0] = mergeStates[i];
+            numToMerge = 1;
+          } else if (cmp == 0)
+            termStates[numToMerge++] = mergeStates[i];
+        }
 
-      final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset);
+        final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset);
 
-      final int delDocLimit;
-      if (segDeletes != null) {
-        final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(termStates[0].termText()));
-        if (docIDUpto != null) {
-          delDocLimit = docIDUpto;
+        final int delDocLimit;
+        if (segDeletes != null) {
+          final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(termStates[0].termText()));
+          if (docIDUpto != null) {
+            delDocLimit = docIDUpto;
+          } else {
+            delDocLimit = 0;
+          }
         } else {
           delDocLimit = 0;
         }
-      } else {
-        delDocLimit = 0;
-      }
-
-      // Now termStates has numToMerge FieldMergeStates
-      // which all share the same term.  Now we must
-      // interleave the docID streams.
-      while(numToMerge > 0) {
-        
-        FreqProxFieldMergeState minState = termStates[0];
-        for(int i=1;i<numToMerge;i++)
-          if (termStates[i].docID < minState.docID)
-            minState = termStates[i];
-
-        final int termDocFreq = minState.termFreq;
-
-        final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq);
-
-        // NOTE: we could check here if the docID was
-        // deleted, and skip it.  However, this is somewhat
-        // dangerous because it can yield non-deterministic
-        // behavior since we may see the docID before we see
-        // the term that caused it to be deleted.  This
-        // would mean some (but not all) of its postings may
-        // make it into the index, which'd alter the docFreq
-        // for those terms.  We could fix this by doing two
-        // passes, ie first sweep marks all del docs, and
-        // 2nd sweep does the real flush, but I suspect
-        // that'd add too much time to flush.
-
-        if (minState.docID < delDocLimit) {
-          // Mark it deleted.  TODO: we could also skip
-          // writing its postings; this would be
-          // deterministic (just for this Term's docs).
-          if (state.deletedDocs == null) {
-            state.deletedDocs = new BitVector(state.numDocs);
-          }
-          state.deletedDocs.set(minState.docID);
-        }
-
-        final ByteSliceReader prox = minState.prox;
-
-        // Carefully copy over the prox + payload info,
-        // changing the format to match Lucene's segment
-        // format.
-        if (!currentFieldOmitTermFreqAndPositions) {
-          // omitTermFreqAndPositions == false so we do write positions &
-          // payload          
-          int position = 0;
-          for(int j=0;j<termDocFreq;j++) {
-            final int code = prox.readVInt();
-            position += code >> 1;
-
-            final int payloadLength;
-            if ((code & 1) != 0) {
-              // This position has a payload
-              payloadLength = prox.readVInt();
-
-              if (payloadBuffer == null || payloadBuffer.length < payloadLength)
-                payloadBuffer = new byte[payloadLength];
-
-              prox.readBytes(payloadBuffer, 0, payloadLength);
-
-            } else
-              payloadLength = 0;
-
-            posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
-          } //End for
-
-          posConsumer.finish();
-        }
-
-        if (!minState.nextDoc()) {
 
-          // Remove from termStates
-          int upto = 0;
-          for(int i=0;i<numToMerge;i++)
-            if (termStates[i] != minState)
-              termStates[upto++] = termStates[i];
-          numToMerge--;
-          assert upto == numToMerge;
-
-          // Advance this state to the next term
-
-          if (!minState.nextTerm()) {
-            // OK, no more terms, so remove from mergeStates
-            // as well
-            upto = 0;
-            for(int i=0;i<numFields;i++)
-              if (mergeStates[i] != minState)
-                mergeStates[upto++] = mergeStates[i];
-            numFields--;
-            assert upto == numFields;
+        try {
+          // Now termStates has numToMerge FieldMergeStates
+          // which all share the same term.  Now we must
+          // interleave the docID streams.
+          while(numToMerge > 0) {
+            
+            FreqProxFieldMergeState minState = termStates[0];
+            for(int i=1;i<numToMerge;i++)
+              if (termStates[i].docID < minState.docID)
+                minState = termStates[i];
+
+            final int termDocFreq = minState.termFreq;
+
+            final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq);
+
+            // NOTE: we could check here if the docID was
+            // deleted, and skip it.  However, this is somewhat
+            // dangerous because it can yield non-deterministic
+            // behavior since we may see the docID before we see
+            // the term that caused it to be deleted.  This
+            // would mean some (but not all) of its postings may
+            // make it into the index, which'd alter the docFreq
+            // for those terms.  We could fix this by doing two
+            // passes, ie first sweep marks all del docs, and
+            // 2nd sweep does the real flush, but I suspect
+            // that'd add too much time to flush.
+
+            if (minState.docID < delDocLimit) {
+              // Mark it deleted.  TODO: we could also skip
+              // writing its postings; this would be
+              // deterministic (just for this Term's docs).
+              if (state.deletedDocs == null) {
+                state.deletedDocs = new BitVector(state.numDocs);
+              }
+              state.deletedDocs.set(minState.docID);
+            }
+
+            final ByteSliceReader prox = minState.prox;
+
+            // Carefully copy over the prox + payload info,
+            // changing the format to match Lucene's segment
+            // format.
+            if (!currentFieldOmitTermFreqAndPositions) {
+              // omitTermFreqAndPositions == false so we do write positions &
+              // payload  
+              try {
+                int position = 0;
+                for(int j=0;j<termDocFreq;j++) {
+                  final int code = prox.readVInt();
+                  position += code >> 1;
+                
+                final int payloadLength;
+                if ((code & 1) != 0) {
+                  // This position has a payload
+                  payloadLength = prox.readVInt();
+                  
+                  if (payloadBuffer == null || payloadBuffer.length < payloadLength)
+                    payloadBuffer = new byte[payloadLength];
+                  
+                  prox.readBytes(payloadBuffer, 0, payloadLength);
+                  
+                } else
+                  payloadLength = 0;
+                
+                posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
+                } //End for
+              } finally {
+                posConsumer.finish();
+              }
+            }
+
+            if (!minState.nextDoc()) {
+
+              // Remove from termStates
+              int upto = 0;
+              for(int i=0;i<numToMerge;i++)
+                if (termStates[i] != minState)
+                  termStates[upto++] = termStates[i];
+              numToMerge--;
+              assert upto == numToMerge;
+
+              // Advance this state to the next term
+
+              if (!minState.nextTerm()) {
+                // OK, no more terms, so remove from mergeStates
+                // as well
+                upto = 0;
+                for(int i=0;i<numFields;i++)
+                  if (mergeStates[i] != minState)
+                    mergeStates[upto++] = mergeStates[i];
+                numFields--;
+                assert upto == numFields;
+              }
+            }
           }
+        } finally {
+          docConsumer.finish();
         }
       }
-
-      docConsumer.finish();
+    } finally {
+      termsConsumer.finish();
     }
-
-    termsConsumer.finish();
   }
 
   final UnicodeUtil.UTF8Result termsUTF8 = new UnicodeUtil.UTF8Result();