You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@lucene.apache.org by bu...@apache.org on 2007/05/17 14:38:43 UTC

svn commit: r538892 - in /lucene/java/trunk: CHANGES.txt src/java/org/apache/lucene/index/DocumentWriter.java src/test/org/apache/lucene/index/TestPayloads.java

Author: buschmi
Date: Thu May 17 05:38:43 2007
New Revision: 538892

URL: http://svn.apache.org/viewvc?view=rev&rev=538892
Log:
LUCENE-880: Fixed DocumentWriter to close the TokenStreams after it has written the postings.

Modified:
    lucene/java/trunk/CHANGES.txt
    lucene/java/trunk/src/java/org/apache/lucene/index/DocumentWriter.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestPayloads.java

Modified: lucene/java/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/java/trunk/CHANGES.txt?view=diff&rev=538892&r1=538891&r2=538892
==============================================================================
--- lucene/java/trunk/CHANGES.txt (original)
+++ lucene/java/trunk/CHANGES.txt Thu May 17 05:38:43 2007
@@ -120,6 +120,10 @@
     was set has no effect - it is masked by the similarity of the MultiSearcher. This is as 
     designed, because MultiSearcher operates on Searchables (not Searchers). (Doron Cohen)
 
+15. LUCENE-880: Fixed DocumentWriter to close the TokenStreams after it
+    has written the postings. Then the resources associated with the 
+    TokenStreams can safely be released. (Michael Busch)
+
 New features
 
  1. LUCENE-759: Added two n-gram-producing TokenFilters.

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentWriter.java?view=diff&rev=538892&r1=538891&r2=538892
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentWriter.java Thu May 17 05:38:43 2007
@@ -35,6 +35,8 @@
 import java.util.Enumeration;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 
 final class DocumentWriter {
   private Analyzer analyzer;
@@ -84,46 +86,67 @@
     fieldBoosts = new float[fieldInfos.size()];	  // init fieldBoosts
     Arrays.fill(fieldBoosts, doc.getBoost());
 
-    // Before we write the FieldInfos we invert the Document. The reason is that
-    // during invertion the TokenStreams of tokenized fields are being processed 
-    // and we might encounter tokens that have payloads associated with them. In 
-    // this case we have to update the FieldInfo of the particular field.
-    invertDocument(doc);
-
-    // sort postingTable into an array
-    Posting[] postings = sortPostingTable();
-    
-    // write field infos 
-    fieldInfos.write(directory, segment + ".fnm");
-
-    // write field values
-    FieldsWriter fieldsWriter =
-            new FieldsWriter(directory, segment, fieldInfos);
     try {
-      fieldsWriter.addDocument(doc);
-    } finally {
-      fieldsWriter.close();
-    }
     
-    /*
-    for (int i = 0; i < postings.length; i++) {
-      Posting posting = postings[i];
-      System.out.print(posting.term);
-      System.out.print(" freq=" + posting.freq);
-      System.out.print(" pos=");
-      System.out.print(posting.positions[0]);
-      for (int j = 1; j < posting.freq; j++)
-	System.out.print("," + posting.positions[j]);
-      System.out.println("");
-    }
-    */
+      // Before we write the FieldInfos we invert the Document. The reason is that
+      // during invertion the TokenStreams of tokenized fields are being processed 
+      // and we might encounter tokens that have payloads associated with them. In 
+      // this case we have to update the FieldInfo of the particular field.
+      invertDocument(doc);
+    
+      // sort postingTable into an array
+      Posting[] postings = sortPostingTable();
+    
+      // write field infos 
+      fieldInfos.write(directory, segment + ".fnm");
 
-    // write postings
-    writePostings(postings, segment);
+      // write field values
+      FieldsWriter fieldsWriter =
+        new FieldsWriter(directory, segment, fieldInfos);
+      try {
+        fieldsWriter.addDocument(doc);
+      } finally {
+        fieldsWriter.close();
+      }
+    
+      /*
+      for (int i = 0; i < postings.length; i++) {
+        Posting posting = postings[i];
+        System.out.print(posting.term);
+        System.out.print(" freq=" + posting.freq);
+        System.out.print(" pos=");
+        System.out.print(posting.positions[0]);
+        for (int j = 1; j < posting.freq; j++)
+	  System.out.print("," + posting.positions[j]);
+        System.out.println("");
+      }
+       */
 
-    // write norms of indexed fields
-    writeNorms(segment);
+      // write postings
+      writePostings(postings, segment);
 
+      // write norms of indexed fields
+      writeNorms(segment);
+    } finally {
+      // close TokenStreams
+      IOException ex = null;
+      
+      Iterator it = openTokenStreams.iterator();
+      while (it.hasNext()) {
+        try {
+          ((TokenStream) it.next()).close();
+        } catch (IOException e) {
+          if (ex != null) {
+            ex = e;
+          }
+        }
+      }
+      openTokenStreams.clear();
+      
+      if (ex != null) {
+        throw ex;
+      }
+    }
   }
 
   // Keys are Terms, values are Postings.
@@ -137,6 +160,10 @@
   // If any of the tokens of a paticular field carry a payload
   // then we enable payloads for that field. 
   private BitSet fieldStoresPayloads;
+  
+  // Keep references of the token streams. We must close them after
+  // the postings are written to the segment.
+  private List openTokenStreams = new LinkedList();
 
   // Tokenizes the fields of a document into Postings.
   private final void invertDocument(Document doc)
@@ -181,42 +208,41 @@
             stream = analyzer.tokenStream(fieldName, reader);
           }
           
+          // remember this TokenStream, we must close it later
+          openTokenStreams.add(stream);
+          
           // reset the TokenStream to the first token
           stream.reset();
           
-          try {
-            Token lastToken = null;
-            for (Token t = stream.next(); t != null; t = stream.next()) {
-              position += (t.getPositionIncrement() - 1);
+
+          Token lastToken = null;
+          for (Token t = stream.next(); t != null; t = stream.next()) {
+            position += (t.getPositionIncrement() - 1);
               
-              Payload payload = t.getPayload();
-              if (payload != null) {
-                // enable payloads for this field
-              	fieldStoresPayloads.set(fieldNumber);
-              }
+            Payload payload = t.getPayload();
+            if (payload != null) {
+              // enable payloads for this field
+              fieldStoresPayloads.set(fieldNumber);
+            }
               
-              TermVectorOffsetInfo termVectorOffsetInfo;
-              if (field.isStoreOffsetWithTermVector()) {
-                termVectorOffsetInfo = new TermVectorOffsetInfo(offset + t.startOffset(), offset + t.endOffset());
-              } else {
-                termVectorOffsetInfo = null;
-              }
-              addPosition(fieldName, t.termText(), position++, payload, termVectorOffsetInfo);
+            TermVectorOffsetInfo termVectorOffsetInfo;
+            if (field.isStoreOffsetWithTermVector()) {
+              termVectorOffsetInfo = new TermVectorOffsetInfo(offset + t.startOffset(), offset + t.endOffset());
+            } else {
+              termVectorOffsetInfo = null;
+            }
+            addPosition(fieldName, t.termText(), position++, payload, termVectorOffsetInfo);
               
-              lastToken = t;
-              if (++length >= maxFieldLength) {
-                if (infoStream != null)
-                  infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens");
-                break;
-              }
+            lastToken = t;
+            if (++length >= maxFieldLength) {
+              if (infoStream != null)
+                infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens");
+              break;
             }
-            
-            if(lastToken != null)
-              offset += lastToken.endOffset() + 1;
-            
-          } finally {
-            stream.close();
           }
+            
+          if(lastToken != null)
+            offset += lastToken.endOffset() + 1;
         }
 
         fieldLengths[fieldNumber] = length;	  // save field length

Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestPayloads.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestPayloads.java?view=diff&rev=538892&r1=538891&r2=538892
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestPayloads.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestPayloads.java Thu May 17 05:38:43 2007
@@ -20,7 +20,9 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.Reader;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -319,10 +321,15 @@
         
     }
     
-    private byte[] generateRandomData(int n) {
-        Random rnd = new Random();
-        byte[] data = new byte[n];
+    private static Random rnd = new Random();
+    
+    private static void generateRandomData(byte[] data) {
         rnd.nextBytes(data);
+    }
+
+    private static byte[] generateRandomData(int n) {
+        byte[] data = new byte[n];
+        generateRandomData(data);
         return data;
     }
     
@@ -439,5 +446,107 @@
             
             return nextToken;
         }
-      }
+    }
+    
+    public void testThreadSafety() throws IOException {
+        final int numThreads = 5;
+        final int numDocs = 50;
+        final ByteArrayPool pool = new ByteArrayPool(numThreads, 5);
+        
+        Directory dir = new RAMDirectory();
+        final IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+        final String field = "test";
+        
+        Thread[] ingesters = new Thread[numThreads];
+        for (int i = 0; i < numThreads; i++) {
+            ingesters[i] = new Thread() {
+                public void run() {
+                    try {
+                        for (int j = 0; j < numDocs; j++) {
+                            Document d = new Document();
+                            d.add(new Field(field, new PoolingPayloadTokenStream(pool)));
+                            writer.addDocument(d);
+                        }
+                    } catch (IOException e) {
+                        fail(e.toString());
+                    }
+                }
+            };
+            ingesters[i].start();
+        }
+        
+        for (int i = 0; i < numThreads; i++) {
+            try {
+                ingesters[i].join();
+            } catch (InterruptedException e) {}
+        }
+        
+        writer.close();
+        IndexReader reader = IndexReader.open(dir);
+        TermEnum terms = reader.terms();
+        while (terms.next()) {
+            TermPositions tp = reader.termPositions(terms.term());
+            while(tp.next()) {
+                int freq = tp.freq();
+                for (int i = 0; i < freq; i++) {
+                    tp.nextPosition();
+                    String s = new String(tp.getPayload(new byte[5], 0));
+                    assertEquals(s, terms.term().text);
+                }
+            }
+            tp.close();
+        }
+        terms.close();
+        reader.close();
+        
+        assertEquals(pool.size(), numThreads);
+    }
+    
+    private static class PoolingPayloadTokenStream extends TokenStream {
+        private byte[] payload;
+        private boolean first;
+        private ByteArrayPool pool;
+        
+        PoolingPayloadTokenStream(ByteArrayPool pool) {
+            this.pool = pool;
+            payload = pool.get();
+            generateRandomData(payload);
+            first = true;
+        }
+        
+        public Token next() throws IOException {
+            if (!first) return null;            
+            Token t = new Token(new String(payload), 0, 0);
+            t.setPayload(new Payload(payload));
+            return t;        
+        }
+        
+        public void close() throws IOException {
+            pool.release(payload);
+        }
+        
+    }
+    
+    private static class ByteArrayPool {
+        private List pool;
+        
+        ByteArrayPool(int capacity, int size) {
+            pool = new ArrayList();
+            for (int i = 0; i < capacity; i++) {
+                pool.add(new byte[size]);
+            }
+        }
+        
+        synchronized byte[] get() {
+            return (byte[]) pool.remove(0);
+        }
+        
+        synchronized void release(byte[] b) {
+            pool.add(b);
+        }
+        
+        synchronized int size() {
+            return pool.size();
+        }
+    }
 }