You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@lucene.apache.org by bu...@apache.org on 2007/05/17 14:38:43 UTC
svn commit: r538892 - in /lucene/java/trunk: CHANGES.txt
src/java/org/apache/lucene/index/DocumentWriter.java
src/test/org/apache/lucene/index/TestPayloads.java
Author: buschmi
Date: Thu May 17 05:38:43 2007
New Revision: 538892
URL: http://svn.apache.org/viewvc?view=rev&rev=538892
Log:
LUCENE-880: Fixed DocumentWriter to close the TokenStreams after it has written the postings.
Modified:
lucene/java/trunk/CHANGES.txt
lucene/java/trunk/src/java/org/apache/lucene/index/DocumentWriter.java
lucene/java/trunk/src/test/org/apache/lucene/index/TestPayloads.java
Modified: lucene/java/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/java/trunk/CHANGES.txt?view=diff&rev=538892&r1=538891&r2=538892
==============================================================================
--- lucene/java/trunk/CHANGES.txt (original)
+++ lucene/java/trunk/CHANGES.txt Thu May 17 05:38:43 2007
@@ -120,6 +120,10 @@
was set has no effect - it is masked by the similarity of the MultiSearcher. This is as
designed, because MultiSearcher operates on Searchables (not Searchers). (Doron Cohen)
+15. LUCENE-880: Fixed DocumentWriter to close the TokenStreams after it
+ has written the postings. Then the resources associated with the
+ TokenStreams can safely be released. (Michael Busch)
+
New features
1. LUCENE-759: Added two n-gram-producing TokenFilters.
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentWriter.java?view=diff&rev=538892&r1=538891&r2=538892
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentWriter.java Thu May 17 05:38:43 2007
@@ -35,6 +35,8 @@
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
final class DocumentWriter {
private Analyzer analyzer;
@@ -84,46 +86,67 @@
fieldBoosts = new float[fieldInfos.size()]; // init fieldBoosts
Arrays.fill(fieldBoosts, doc.getBoost());
- // Before we write the FieldInfos we invert the Document. The reason is that
- // during invertion the TokenStreams of tokenized fields are being processed
- // and we might encounter tokens that have payloads associated with them. In
- // this case we have to update the FieldInfo of the particular field.
- invertDocument(doc);
-
- // sort postingTable into an array
- Posting[] postings = sortPostingTable();
-
- // write field infos
- fieldInfos.write(directory, segment + ".fnm");
-
- // write field values
- FieldsWriter fieldsWriter =
- new FieldsWriter(directory, segment, fieldInfos);
try {
- fieldsWriter.addDocument(doc);
- } finally {
- fieldsWriter.close();
- }
- /*
- for (int i = 0; i < postings.length; i++) {
- Posting posting = postings[i];
- System.out.print(posting.term);
- System.out.print(" freq=" + posting.freq);
- System.out.print(" pos=");
- System.out.print(posting.positions[0]);
- for (int j = 1; j < posting.freq; j++)
- System.out.print("," + posting.positions[j]);
- System.out.println("");
- }
- */
+ // Before we write the FieldInfos we invert the Document. The reason is that
+ // during invertion the TokenStreams of tokenized fields are being processed
+ // and we might encounter tokens that have payloads associated with them. In
+ // this case we have to update the FieldInfo of the particular field.
+ invertDocument(doc);
+
+ // sort postingTable into an array
+ Posting[] postings = sortPostingTable();
+
+ // write field infos
+ fieldInfos.write(directory, segment + ".fnm");
- // write postings
- writePostings(postings, segment);
+ // write field values
+ FieldsWriter fieldsWriter =
+ new FieldsWriter(directory, segment, fieldInfos);
+ try {
+ fieldsWriter.addDocument(doc);
+ } finally {
+ fieldsWriter.close();
+ }
+
+ /*
+ for (int i = 0; i < postings.length; i++) {
+ Posting posting = postings[i];
+ System.out.print(posting.term);
+ System.out.print(" freq=" + posting.freq);
+ System.out.print(" pos=");
+ System.out.print(posting.positions[0]);
+ for (int j = 1; j < posting.freq; j++)
+ System.out.print("," + posting.positions[j]);
+ System.out.println("");
+ }
+ */
- // write norms of indexed fields
- writeNorms(segment);
+ // write postings
+ writePostings(postings, segment);
+ // write norms of indexed fields
+ writeNorms(segment);
+ } finally {
+ // close TokenStreams
+ IOException ex = null;
+
+ Iterator it = openTokenStreams.iterator();
+ while (it.hasNext()) {
+ try {
+ ((TokenStream) it.next()).close();
+ } catch (IOException e) {
+ if (ex != null) {
+ ex = e;
+ }
+ }
+ }
+ openTokenStreams.clear();
+
+ if (ex != null) {
+ throw ex;
+ }
+ }
}
// Keys are Terms, values are Postings.
@@ -137,6 +160,10 @@
// If any of the tokens of a paticular field carry a payload
// then we enable payloads for that field.
private BitSet fieldStoresPayloads;
+
+ // Keep references of the token streams. We must close them after
+ // the postings are written to the segment.
+ private List openTokenStreams = new LinkedList();
// Tokenizes the fields of a document into Postings.
private final void invertDocument(Document doc)
@@ -181,42 +208,41 @@
stream = analyzer.tokenStream(fieldName, reader);
}
+ // remember this TokenStream, we must close it later
+ openTokenStreams.add(stream);
+
// reset the TokenStream to the first token
stream.reset();
- try {
- Token lastToken = null;
- for (Token t = stream.next(); t != null; t = stream.next()) {
- position += (t.getPositionIncrement() - 1);
+
+ Token lastToken = null;
+ for (Token t = stream.next(); t != null; t = stream.next()) {
+ position += (t.getPositionIncrement() - 1);
- Payload payload = t.getPayload();
- if (payload != null) {
- // enable payloads for this field
- fieldStoresPayloads.set(fieldNumber);
- }
+ Payload payload = t.getPayload();
+ if (payload != null) {
+ // enable payloads for this field
+ fieldStoresPayloads.set(fieldNumber);
+ }
- TermVectorOffsetInfo termVectorOffsetInfo;
- if (field.isStoreOffsetWithTermVector()) {
- termVectorOffsetInfo = new TermVectorOffsetInfo(offset + t.startOffset(), offset + t.endOffset());
- } else {
- termVectorOffsetInfo = null;
- }
- addPosition(fieldName, t.termText(), position++, payload, termVectorOffsetInfo);
+ TermVectorOffsetInfo termVectorOffsetInfo;
+ if (field.isStoreOffsetWithTermVector()) {
+ termVectorOffsetInfo = new TermVectorOffsetInfo(offset + t.startOffset(), offset + t.endOffset());
+ } else {
+ termVectorOffsetInfo = null;
+ }
+ addPosition(fieldName, t.termText(), position++, payload, termVectorOffsetInfo);
- lastToken = t;
- if (++length >= maxFieldLength) {
- if (infoStream != null)
- infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens");
- break;
- }
+ lastToken = t;
+ if (++length >= maxFieldLength) {
+ if (infoStream != null)
+ infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens");
+ break;
}
-
- if(lastToken != null)
- offset += lastToken.endOffset() + 1;
-
- } finally {
- stream.close();
}
+
+ if(lastToken != null)
+ offset += lastToken.endOffset() + 1;
}
fieldLengths[fieldNumber] = length; // save field length
Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestPayloads.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestPayloads.java?view=diff&rev=538892&r1=538891&r2=538892
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestPayloads.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestPayloads.java Thu May 17 05:38:43 2007
@@ -20,7 +20,9 @@
import java.io.File;
import java.io.IOException;
import java.io.Reader;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -319,10 +321,15 @@
}
- private byte[] generateRandomData(int n) {
- Random rnd = new Random();
- byte[] data = new byte[n];
+ private static Random rnd = new Random();
+
+ private static void generateRandomData(byte[] data) {
rnd.nextBytes(data);
+ }
+
+ private static byte[] generateRandomData(int n) {
+ byte[] data = new byte[n];
+ generateRandomData(data);
return data;
}
@@ -439,5 +446,107 @@
return nextToken;
}
- }
+ }
+
+ public void testThreadSafety() throws IOException {
+ final int numThreads = 5;
+ final int numDocs = 50;
+ final ByteArrayPool pool = new ByteArrayPool(numThreads, 5);
+
+ Directory dir = new RAMDirectory();
+ final IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+ final String field = "test";
+
+ Thread[] ingesters = new Thread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ ingesters[i] = new Thread() {
+ public void run() {
+ try {
+ for (int j = 0; j < numDocs; j++) {
+ Document d = new Document();
+ d.add(new Field(field, new PoolingPayloadTokenStream(pool)));
+ writer.addDocument(d);
+ }
+ } catch (IOException e) {
+ fail(e.toString());
+ }
+ }
+ };
+ ingesters[i].start();
+ }
+
+ for (int i = 0; i < numThreads; i++) {
+ try {
+ ingesters[i].join();
+ } catch (InterruptedException e) {}
+ }
+
+ writer.close();
+ IndexReader reader = IndexReader.open(dir);
+ TermEnum terms = reader.terms();
+ while (terms.next()) {
+ TermPositions tp = reader.termPositions(terms.term());
+ while(tp.next()) {
+ int freq = tp.freq();
+ for (int i = 0; i < freq; i++) {
+ tp.nextPosition();
+ String s = new String(tp.getPayload(new byte[5], 0));
+ assertEquals(s, terms.term().text);
+ }
+ }
+ tp.close();
+ }
+ terms.close();
+ reader.close();
+
+ assertEquals(pool.size(), numThreads);
+ }
+
+ private static class PoolingPayloadTokenStream extends TokenStream {
+ private byte[] payload;
+ private boolean first;
+ private ByteArrayPool pool;
+
+ PoolingPayloadTokenStream(ByteArrayPool pool) {
+ this.pool = pool;
+ payload = pool.get();
+ generateRandomData(payload);
+ first = true;
+ }
+
+ public Token next() throws IOException {
+ if (!first) return null;
+ Token t = new Token(new String(payload), 0, 0);
+ t.setPayload(new Payload(payload));
+ return t;
+ }
+
+ public void close() throws IOException {
+ pool.release(payload);
+ }
+
+ }
+
+ private static class ByteArrayPool {
+ private List pool;
+
+ ByteArrayPool(int capacity, int size) {
+ pool = new ArrayList();
+ for (int i = 0; i < capacity; i++) {
+ pool.add(new byte[size]);
+ }
+ }
+
+ synchronized byte[] get() {
+ return (byte[]) pool.remove(0);
+ }
+
+ synchronized void release(byte[] b) {
+ pool.add(b);
+ }
+
+ synchronized int size() {
+ return pool.size();
+ }
+ }
}