You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2010/07/21 12:27:22 UTC
svn commit: r966168 [1/4] - in /lucene/dev:
branches/realtime_search/lucene/src/java/org/apache/lucene/index/
branches/realtime_search/lucene/src/java/org/apache/lucene/util/
branches/realtime_search/lucene/src/test/org/apache/lucene/index/
trunk/lucen...
Author: buschmi
Date: Wed Jul 21 10:27:20 2010
New Revision: 966168
URL: http://svn.apache.org/viewvc?rev=966168&view=rev
Log:
LUCENE-2324: Committing second version of the patch to the real-time branch. It's not done yet, but easier to track progress using the branch.
Added:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/util/ThreadSafeCloneableSortedMap.java
Removed:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumerPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxFieldMergeState.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumerPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumerPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriterPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumerPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashPerThread.java
Modified:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumer.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestByteSlices.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,70 @@
+package org.apache.lucene.index;
+
+import java.util.TreeMap;
+
+import org.apache.lucene.search.Query;
+import org.apache.lucene.util.ThreadSafeCloneableSortedMap;
+
+public class BufferedDeletesInRAM {
+ static class Delete {
+ int flushCount;
+
+ public Delete(int flushCount) {
+ this.flushCount = flushCount;
+ }
+ }
+
+ final static class DeleteTerm extends Delete {
+ final Term term;
+
+ public DeleteTerm(Term term, int flushCount) {
+ super(flushCount);
+ this.term = term;
+ }
+ }
+
+ final static class DeleteTerms extends Delete {
+ final Term[] terms;
+
+ public DeleteTerms(Term[] terms, int flushCount) {
+ super(flushCount);
+ this.terms = terms;
+ }
+ }
+
+ final static class DeleteQuery extends Delete {
+ final Query query;
+
+ public DeleteQuery(Query query, int flushCount) {
+ super(flushCount);
+ this.query = query;
+ }
+ }
+
+ final ThreadSafeCloneableSortedMap<Long, Delete> deletes = ThreadSafeCloneableSortedMap
+ .getThreadSafeSortedMap(new TreeMap<Long, Delete>());
+
+ final void addDeleteTerm(Term term, long sequenceID, int numThreadStates) {
+ deletes.put(sequenceID, new DeleteTerm(term, numThreadStates));
+ }
+
+ final void addDeleteTerms(Term[] terms, long sequenceID, int numThreadStates) {
+ deletes.put(sequenceID, new DeleteTerms(terms, numThreadStates));
+ }
+
+ final void addDeleteQuery(Query query, long sequenceID, int numThreadStates) {
+ deletes.put(sequenceID, new DeleteQuery(query, numThreadStates));
+ }
+
+ boolean hasDeletes() {
+ return !deletes.isEmpty();
+ }
+
+ void clear() {
+ deletes.clear();
+ }
+
+ int getNumDeletes() {
+ return this.deletes.size();
+ }
+}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java Wed Jul 21 10:27:20 2010
@@ -50,10 +50,10 @@ final class ByteBlockPool {
public byte[][] buffers = new byte[10][];
int bufferUpto = -1; // Which buffer we are upto
- public int byteUpto = DocumentsWriter.BYTE_BLOCK_SIZE; // Where we are in head buffer
+ public int byteUpto = DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE; // Where we are in head buffer
public byte[] buffer; // Current head buffer
- public int byteOffset = -DocumentsWriter.BYTE_BLOCK_SIZE; // Current head offset
+ public int byteOffset = -DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE; // Current head offset
private final Allocator allocator;
@@ -95,11 +95,11 @@ final class ByteBlockPool {
bufferUpto++;
byteUpto = 0;
- byteOffset += DocumentsWriter.BYTE_BLOCK_SIZE;
+ byteOffset += DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
}
public int newSlice(final int size) {
- if (byteUpto > DocumentsWriter.BYTE_BLOCK_SIZE-size)
+ if (byteUpto > DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-size)
nextBuffer();
final int upto = byteUpto;
byteUpto += size;
@@ -123,7 +123,7 @@ final class ByteBlockPool {
final int newSize = levelSizeArray[newLevel];
// Maybe allocate another block
- if (byteUpto > DocumentsWriter.BYTE_BLOCK_SIZE-newSize)
+ if (byteUpto > DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-newSize)
nextBuffer();
final int newUpto = byteUpto;
@@ -151,8 +151,8 @@ final class ByteBlockPool {
// Fill in a BytesRef from term's length & bytes encoded in
// byte block
final BytesRef setBytesRef(BytesRef term, int textStart) {
- final byte[] bytes = term.bytes = buffers[textStart >> DocumentsWriter.BYTE_BLOCK_SHIFT];
- int pos = textStart & DocumentsWriter.BYTE_BLOCK_MASK;
+ final byte[] bytes = term.bytes = buffers[textStart >> DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
+ int pos = textStart & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
if ((bytes[pos] & 0x80) == 0) {
// length is 1 byte
term.length = bytes[pos];
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java Wed Jul 21 10:27:20 2010
@@ -48,16 +48,16 @@ final class ByteSliceReader extends Data
this.endIndex = endIndex;
level = 0;
- bufferUpto = startIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
- bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
+ bufferUpto = startIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
+ bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
buffer = pool.buffers[bufferUpto];
- upto = startIndex & DocumentsWriter.BYTE_BLOCK_MASK;
+ upto = startIndex & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
final int firstSize = ByteBlockPool.levelSizeArray[0];
if (startIndex+firstSize >= endIndex) {
// There is only this one slice to read
- limit = endIndex & DocumentsWriter.BYTE_BLOCK_MASK;
+ limit = endIndex & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
} else
limit = upto+firstSize-4;
}
@@ -102,11 +102,11 @@ final class ByteSliceReader extends Data
level = ByteBlockPool.nextLevelArray[level];
final int newSize = ByteBlockPool.levelSizeArray[level];
- bufferUpto = nextIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
- bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
+ bufferUpto = nextIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
+ bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
buffer = pool.buffers[bufferUpto];
- upto = nextIndex & DocumentsWriter.BYTE_BLOCK_MASK;
+ upto = nextIndex & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
if (nextIndex + newSize >= endIndex) {
// We are advancing to the final slice
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java Wed Jul 21 10:27:20 2010
@@ -42,9 +42,9 @@ final class ByteSliceWriter extends Data
* Set up the writer to write at address.
*/
public void init(int address) {
- slice = pool.buffers[address >> DocumentsWriter.BYTE_BLOCK_SHIFT];
+ slice = pool.buffers[address >> DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
assert slice != null;
- upto = address & DocumentsWriter.BYTE_BLOCK_MASK;
+ upto = address & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
offset0 = address;
assert upto < slice.length;
}
@@ -80,6 +80,6 @@ final class ByteSliceWriter extends Data
}
public int getAddress() {
- return upto + (offset0 & DocumentsWriter.BYTE_BLOCK_NOT_MASK);
+ return upto + (offset0 & DocumentsWriterRAMAllocator.BYTE_BLOCK_NOT_MASK);
}
}
\ No newline at end of file
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java Wed Jul 21 10:27:20 2010
@@ -18,11 +18,10 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
-import java.util.Collection;
abstract class DocConsumer {
- abstract DocConsumerPerThread addThread(DocumentsWriterThreadState perThread) throws IOException;
- abstract void flush(final Collection<DocConsumerPerThread> threads, final SegmentWriteState state) throws IOException;
+ abstract DocumentsWriterPerThread.DocWriter processDocument() throws IOException;
+ abstract void flush(final SegmentWriteState state) throws IOException;
abstract void closeDocStore(final SegmentWriteState state) throws IOException;
abstract void abort();
abstract boolean freeRAM();
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java Wed Jul 21 10:27:20 2010
@@ -18,7 +18,6 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
-import java.util.Collection;
import java.util.Map;
abstract class DocFieldConsumer {
@@ -27,7 +26,7 @@ abstract class DocFieldConsumer {
/** Called when DocumentsWriter decides to create a new
* segment */
- abstract void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+ abstract void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
/** Called when DocumentsWriter decides to close the doc
* stores */
@@ -36,14 +35,17 @@ abstract class DocFieldConsumer {
/** Called when an aborting exception is hit */
abstract void abort();
- /** Add a new thread */
- abstract DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException;
-
/** Called when DocumentsWriter is using too much RAM.
* The consumer should free RAM, if possible, returning
* true if any RAM was in fact freed. */
abstract boolean freeRAM();
+
+ abstract void startDocument() throws IOException;
+ abstract DocFieldConsumerPerField addField(FieldInfo fi);
+
+ abstract DocumentsWriterPerThread.DocWriter finishDocument() throws IOException;
+
void setFieldInfos(FieldInfos fieldInfos) {
this.fieldInfos = fieldInfos;
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java Wed Jul 21 10:27:20 2010
@@ -24,4 +24,5 @@ abstract class DocFieldConsumerPerField
/** Processes all occurrences of a single field */
abstract void processFields(Fieldable[] fields, int count) throws IOException;
abstract void abort();
+ abstract FieldInfo getFieldInfo();
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java Wed Jul 21 10:27:20 2010
@@ -17,12 +17,9 @@ package org.apache.lucene.index;
* limitations under the License.
*/
+import java.io.IOException;
import java.util.HashMap;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.Map;
-import java.util.HashSet;
-import java.io.IOException;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
@@ -33,10 +30,12 @@ import org.apache.lucene.util.RamUsageEs
final class DocFieldConsumers extends DocFieldConsumer {
final DocFieldConsumer one;
final DocFieldConsumer two;
+ final DocumentsWriterPerThread.DocState docState;
- public DocFieldConsumers(DocFieldConsumer one, DocFieldConsumer two) {
+ public DocFieldConsumers(DocFieldProcessor processor, DocFieldConsumer one, DocFieldConsumer two) {
this.one = one;
this.two = two;
+ this.docState = processor.docState;
}
@Override
@@ -47,33 +46,19 @@ final class DocFieldConsumers extends Do
}
@Override
- public void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
- Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> oneThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
- Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> twoThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
-
- for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet()) {
+ public void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
- final DocFieldConsumersPerThread perThread = (DocFieldConsumersPerThread) entry.getKey();
+ Map<FieldInfo, DocFieldConsumerPerField> oneFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
+ Map<FieldInfo, DocFieldConsumerPerField> twoFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
- final Collection<DocFieldConsumerPerField> fields = entry.getValue();
-
- Iterator<DocFieldConsumerPerField> fieldsIt = fields.iterator();
- Collection<DocFieldConsumerPerField> oneFields = new HashSet<DocFieldConsumerPerField>();
- Collection<DocFieldConsumerPerField> twoFields = new HashSet<DocFieldConsumerPerField>();
- while(fieldsIt.hasNext()) {
- DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldsIt.next();
- oneFields.add(perField.one);
- twoFields.add(perField.two);
- }
-
- oneThreadsAndFields.put(perThread.one, oneFields);
- twoThreadsAndFields.put(perThread.two, twoFields);
+ for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
+ DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldToFlush.getValue();
+ oneFieldsToFlush.put(fieldToFlush.getKey(), perField.one);
+ twoFieldsToFlush.put(fieldToFlush.getKey(), perField.two);
}
-
- one.flush(oneThreadsAndFields, state);
- two.flush(twoThreadsAndFields, state);
+ one.flush(oneFieldsToFlush, state);
+ two.flush(twoFieldsToFlush, state);
}
@Override
@@ -101,16 +86,11 @@ final class DocFieldConsumers extends Do
return any;
}
- @Override
- public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException {
- return new DocFieldConsumersPerThread(docFieldProcessorPerThread, this, one.addThread(docFieldProcessorPerThread), two.addThread(docFieldProcessorPerThread));
- }
-
PerDoc[] docFreeList = new PerDoc[1];
int freeCount;
int allocCount;
- synchronized PerDoc getPerDoc() {
+ PerDoc getPerDoc() {
if (freeCount == 0) {
allocCount++;
if (allocCount > docFreeList.length) {
@@ -125,15 +105,15 @@ final class DocFieldConsumers extends Do
return docFreeList[--freeCount];
}
- synchronized void freePerDoc(PerDoc perDoc) {
+ void freePerDoc(PerDoc perDoc) {
assert freeCount < docFreeList.length;
docFreeList[freeCount++] = perDoc;
}
- class PerDoc extends DocumentsWriter.DocWriter {
+ class PerDoc extends DocumentsWriterPerThread.DocWriter {
- DocumentsWriter.DocWriter writerOne;
- DocumentsWriter.DocWriter writerTwo;
+ DocumentsWriterPerThread.DocWriter writerOne;
+ DocumentsWriterPerThread.DocWriter writerTwo;
@Override
public long sizeInBytes() {
@@ -166,4 +146,35 @@ final class DocFieldConsumers extends Do
}
}
}
+
+ @Override
+ public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
+ final DocumentsWriterPerThread.DocWriter oneDoc = one.finishDocument();
+ final DocumentsWriterPerThread.DocWriter twoDoc = two.finishDocument();
+ if (oneDoc == null)
+ return twoDoc;
+ else if (twoDoc == null)
+ return oneDoc;
+ else {
+ DocFieldConsumers.PerDoc both = getPerDoc();
+ both.docID = docState.docID;
+ assert oneDoc.docID == docState.docID;
+ assert twoDoc.docID == docState.docID;
+ both.writerOne = oneDoc;
+ both.writerTwo = twoDoc;
+ return both;
+ }
+ }
+
+ @Override
+ public void startDocument() throws IOException {
+ one.startDocument();
+ two.startDocument();
+ }
+
+ @Override
+ public DocFieldConsumerPerField addField(FieldInfo fi) {
+ return new DocFieldConsumersPerField(this, fi, one.addField(fi), two.addField(fi));
+ }
+
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java Wed Jul 21 10:27:20 2010
@@ -24,12 +24,14 @@ final class DocFieldConsumersPerField ex
final DocFieldConsumerPerField one;
final DocFieldConsumerPerField two;
- final DocFieldConsumersPerThread perThread;
+ final DocFieldConsumers parent;
+ final FieldInfo fieldInfo;
- public DocFieldConsumersPerField(DocFieldConsumersPerThread perThread, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
- this.perThread = perThread;
+ public DocFieldConsumersPerField(DocFieldConsumers parent, FieldInfo fi, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
+ this.parent = parent;
this.one = one;
this.two = two;
+ this.fieldInfo = fi;
}
@Override
@@ -46,4 +48,9 @@ final class DocFieldConsumersPerField ex
two.abort();
}
}
+
+ @Override
+ FieldInfo getFieldInfo() {
+ return fieldInfo;
+ }
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Wed Jul 21 10:27:20 2010
@@ -19,8 +19,15 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
-import java.util.Map;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.RamUsageEstimator;
/**
@@ -33,13 +40,27 @@ import java.util.HashMap;
final class DocFieldProcessor extends DocConsumer {
- final DocumentsWriter docWriter;
final FieldInfos fieldInfos = new FieldInfos();
final DocFieldConsumer consumer;
final StoredFieldsWriter fieldsWriter;
- public DocFieldProcessor(DocumentsWriter docWriter, DocFieldConsumer consumer) {
- this.docWriter = docWriter;
+ // Holds all fields seen in current doc
+ DocFieldProcessorPerField[] fields = new DocFieldProcessorPerField[1];
+ int fieldCount;
+
+ // Hash table for all fields ever seen
+ DocFieldProcessorPerField[] fieldHash = new DocFieldProcessorPerField[2];
+ int hashMask = 1;
+ int totalFieldCount;
+
+
+ float docBoost;
+ int fieldGen;
+ final DocumentsWriterPerThread.DocState docState;
+
+
+ public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
+ this.docState = docWriter.docState;
this.consumer = consumer;
consumer.setFieldInfos(fieldInfos);
fieldsWriter = new StoredFieldsWriter(docWriter, fieldInfos);
@@ -52,16 +73,17 @@ final class DocFieldProcessor extends Do
}
@Override
- public void flush(Collection<DocConsumerPerThread> threads, SegmentWriteState state) throws IOException {
+ public void flush(SegmentWriteState state) throws IOException {
- Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>();
- for ( DocConsumerPerThread thread : threads) {
- DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
- childThreadsAndFields.put(perThread.consumer, perThread.fields());
- perThread.trimFields(state);
+ Map<FieldInfo, DocFieldConsumerPerField> childFields = new HashMap<FieldInfo, DocFieldConsumerPerField>();
+ Collection<DocFieldConsumerPerField> fields = fields();
+ for (DocFieldConsumerPerField f : fields) {
+ childFields.put(f.getFieldInfo(), f);
}
+ trimFields(state);
+
fieldsWriter.flush(state);
- consumer.flush(childThreadsAndFields, state);
+ consumer.flush(childFields, state);
// Important to save after asking consumer to flush so
// consumer can alter the FieldInfo* if necessary. EG,
@@ -74,6 +96,15 @@ final class DocFieldProcessor extends Do
@Override
public void abort() {
+ for(int i=0;i<fieldHash.length;i++) {
+ DocFieldProcessorPerField field = fieldHash[i];
+ while(field != null) {
+ final DocFieldProcessorPerField next = field.next;
+ field.abort();
+ field = next;
+ }
+ }
+
fieldsWriter.abort();
consumer.abort();
}
@@ -82,9 +113,317 @@ final class DocFieldProcessor extends Do
public boolean freeRAM() {
return consumer.freeRAM();
}
+
+ public Collection<DocFieldConsumerPerField> fields() {
+ Collection<DocFieldConsumerPerField> fields = new HashSet<DocFieldConsumerPerField>();
+ for(int i=0;i<fieldHash.length;i++) {
+ DocFieldProcessorPerField field = fieldHash[i];
+ while(field != null) {
+ fields.add(field.consumer);
+ field = field.next;
+ }
+ }
+ assert fields.size() == totalFieldCount;
+ return fields;
+ }
+
+ /** If there are fields we've seen but did not see again
+ * in the last run, then free them up. */
+
+ void trimFields(SegmentWriteState state) {
+
+ for(int i=0;i<fieldHash.length;i++) {
+ DocFieldProcessorPerField perField = fieldHash[i];
+ DocFieldProcessorPerField lastPerField = null;
+
+ while (perField != null) {
+
+ if (perField.lastGen == -1) {
+
+ // This field was not seen since the previous
+ // flush, so, free up its resources now
+
+ // Unhash
+ if (lastPerField == null)
+ fieldHash[i] = perField.next;
+ else
+ lastPerField.next = perField.next;
+
+ if (state.infoStream != null) {
+ state.infoStream.println(" purge field=" + perField.fieldInfo.name);
+ }
+
+ totalFieldCount--;
+
+ } else {
+ // Reset
+ perField.lastGen = -1;
+ lastPerField = perField;
+ }
+
+ perField = perField.next;
+ }
+ }
+ }
+
+ private void rehash() {
+ final int newHashSize = (fieldHash.length*2);
+ assert newHashSize > fieldHash.length;
+
+ final DocFieldProcessorPerField newHashArray[] = new DocFieldProcessorPerField[newHashSize];
+
+ // Rehash
+ int newHashMask = newHashSize-1;
+ for(int j=0;j<fieldHash.length;j++) {
+ DocFieldProcessorPerField fp0 = fieldHash[j];
+ while(fp0 != null) {
+ final int hashPos2 = fp0.fieldInfo.name.hashCode() & newHashMask;
+ DocFieldProcessorPerField nextFP0 = fp0.next;
+ fp0.next = newHashArray[hashPos2];
+ newHashArray[hashPos2] = fp0;
+ fp0 = nextFP0;
+ }
+ }
+
+ fieldHash = newHashArray;
+ hashMask = newHashMask;
+ }
@Override
- public DocConsumerPerThread addThread(DocumentsWriterThreadState threadState) throws IOException {
- return new DocFieldProcessorPerThread(threadState, this);
+ public DocumentsWriterPerThread.DocWriter processDocument() throws IOException {
+
+ consumer.startDocument();
+ fieldsWriter.startDocument();
+
+ final Document doc = docState.doc;
+
+ fieldCount = 0;
+
+ final int thisFieldGen = fieldGen++;
+
+ final List<Fieldable> docFields = doc.getFields();
+ final int numDocFields = docFields.size();
+
+ // Absorb any new fields first seen in this document.
+ // Also absorb any changes to fields we had already
+ // seen before (eg suddenly turning on norms or
+ // vectors, etc.):
+
+ for(int i=0;i<numDocFields;i++) {
+ Fieldable field = docFields.get(i);
+ final String fieldName = field.name();
+
+ // Make sure we have a PerField allocated
+ final int hashPos = fieldName.hashCode() & hashMask;
+ DocFieldProcessorPerField fp = fieldHash[hashPos];
+ while(fp != null && !fp.fieldInfo.name.equals(fieldName)) {
+ fp = fp.next;
+ }
+
+ if (fp == null) {
+
+ // TODO FI: we need to genericize the "flags" that a
+ // field holds, and, how these flags are merged; it
+ // needs to be more "pluggable" such that if I want
+ // to have a new "thing" my Fields can do, I can
+ // easily add it
+ FieldInfo fi = fieldInfos.add(fieldName, field.isIndexed(), field.isTermVectorStored(),
+ field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+ field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+
+ fp = new DocFieldProcessorPerField(this, fi);
+ fp.next = fieldHash[hashPos];
+ fieldHash[hashPos] = fp;
+ totalFieldCount++;
+
+ if (totalFieldCount >= fieldHash.length/2)
+ rehash();
+ } else {
+ fp.fieldInfo.update(field.isIndexed(), field.isTermVectorStored(),
+ field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+ field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+ }
+
+ if (thisFieldGen != fp.lastGen) {
+
+ // First time we're seeing this field for this doc
+ fp.fieldCount = 0;
+
+ if (fieldCount == fields.length) {
+ final int newSize = fields.length*2;
+ DocFieldProcessorPerField newArray[] = new DocFieldProcessorPerField[newSize];
+ System.arraycopy(fields, 0, newArray, 0, fieldCount);
+ fields = newArray;
+ }
+
+ fields[fieldCount++] = fp;
+ fp.lastGen = thisFieldGen;
+ }
+
+ if (fp.fieldCount == fp.fields.length) {
+ Fieldable[] newArray = new Fieldable[fp.fields.length*2];
+ System.arraycopy(fp.fields, 0, newArray, 0, fp.fieldCount);
+ fp.fields = newArray;
+ }
+
+ fp.fields[fp.fieldCount++] = field;
+ if (field.isStored()) {
+ fieldsWriter.addField(field, fp.fieldInfo);
+ }
+ }
+
+ // If we are writing vectors then we must visit
+ // fields in sorted order so they are written in
+ // sorted order. TODO: we actually only need to
+ // sort the subset of fields that have vectors
+ // enabled; we could save [small amount of] CPU
+ // here.
+ quickSort(fields, 0, fieldCount-1);
+
+ for(int i=0;i<fieldCount;i++)
+ fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount);
+
+ if (docState.maxTermPrefix != null && docState.infoStream != null) {
+ docState.infoStream.println("WARNING: document contains at least one immense term (whose UTF8 encoding is longer than the max length " + DocumentsWriterRAMAllocator.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '" + docState.maxTermPrefix + "...'");
+ docState.maxTermPrefix = null;
+ }
+
+ final DocumentsWriterPerThread.DocWriter one = fieldsWriter.finishDocument();
+ final DocumentsWriterPerThread.DocWriter two = consumer.finishDocument();
+ if (one == null) {
+ return two;
+ } else if (two == null) {
+ return one;
+ } else {
+ PerDoc both = getPerDoc();
+ both.docID = docState.docID;
+ assert one.docID == docState.docID;
+ assert two.docID == docState.docID;
+ both.one = one;
+ both.two = two;
+ return both;
+ }
+ }
+
+ void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) {
+ if (lo >= hi)
+ return;
+ else if (hi == 1+lo) {
+ if (array[lo].fieldInfo.name.compareTo(array[hi].fieldInfo.name) > 0) {
+ final DocFieldProcessorPerField tmp = array[lo];
+ array[lo] = array[hi];
+ array[hi] = tmp;
+ }
+ return;
+ }
+
+ int mid = (lo + hi) >>> 1;
+
+ if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name) > 0) {
+ DocFieldProcessorPerField tmp = array[lo];
+ array[lo] = array[mid];
+ array[mid] = tmp;
+ }
+
+ if (array[mid].fieldInfo.name.compareTo(array[hi].fieldInfo.name) > 0) {
+ DocFieldProcessorPerField tmp = array[mid];
+ array[mid] = array[hi];
+ array[hi] = tmp;
+
+ if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name) > 0) {
+ DocFieldProcessorPerField tmp2 = array[lo];
+ array[lo] = array[mid];
+ array[mid] = tmp2;
+ }
+ }
+
+ int left = lo + 1;
+ int right = hi - 1;
+
+ if (left >= right)
+ return;
+
+ DocFieldProcessorPerField partition = array[mid];
+
+ for (; ;) {
+ while (array[right].fieldInfo.name.compareTo(partition.fieldInfo.name) > 0)
+ --right;
+
+ while (left < right && array[left].fieldInfo.name.compareTo(partition.fieldInfo.name) <= 0)
+ ++left;
+
+ if (left < right) {
+ DocFieldProcessorPerField tmp = array[left];
+ array[left] = array[right];
+ array[right] = tmp;
+ --right;
+ } else {
+ break;
+ }
+ }
+
+ quickSort(array, lo, left);
+ quickSort(array, left + 1, hi);
+ }
+
+ PerDoc[] docFreeList = new PerDoc[1];
+ int freeCount;
+ int allocCount;
+
+ PerDoc getPerDoc() {
+ if (freeCount == 0) {
+ allocCount++;
+ if (allocCount > docFreeList.length) {
+ // Grow our free list up front to make sure we have
+ // enough space to recycle all outstanding PerDoc
+ // instances
+ assert allocCount == 1+docFreeList.length;
+ docFreeList = new PerDoc[ArrayUtil.oversize(allocCount, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
+ }
+ return new PerDoc();
+ } else
+ return docFreeList[--freeCount];
+ }
+
+ void freePerDoc(PerDoc perDoc) {
+ assert freeCount < docFreeList.length;
+ docFreeList[freeCount++] = perDoc;
+ }
+
+ class PerDoc extends DocumentsWriterPerThread.DocWriter {
+
+ DocumentsWriterPerThread.DocWriter one;
+ DocumentsWriterPerThread.DocWriter two;
+
+ @Override
+ public long sizeInBytes() {
+ return one.sizeInBytes() + two.sizeInBytes();
+ }
+
+ @Override
+ public void finish() throws IOException {
+ try {
+ try {
+ one.finish();
+ } finally {
+ two.finish();
+ }
+ } finally {
+ freePerDoc(this);
+ }
+ }
+
+ @Override
+ public void abort() {
+ try {
+ try {
+ one.abort();
+ } finally {
+ two.abort();
+ }
+ } finally {
+ freePerDoc(this);
+ }
+ }
}
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java Wed Jul 21 10:27:20 2010
@@ -34,8 +34,8 @@ final class DocFieldProcessorPerField {
int fieldCount;
Fieldable[] fields = new Fieldable[1];
- public DocFieldProcessorPerField(final DocFieldProcessorPerThread perThread, final FieldInfo fieldInfo) {
- this.consumer = perThread.consumer.addField(fieldInfo);
+ public DocFieldProcessorPerField(final DocFieldProcessor docFieldProcessor, final FieldInfo fieldInfo) {
+ this.consumer = docFieldProcessor.consumer.addField(fieldInfo);
this.fieldInfo = fieldInfo;
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java Wed Jul 21 10:27:20 2010
@@ -18,12 +18,13 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
-
import java.util.Map;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
+import org.apache.lucene.util.AttributeSource;
+
/** This is a DocFieldConsumer that inverts each field,
* separately, from a Document, and accepts a
@@ -34,7 +35,32 @@ final class DocInverter extends DocField
final InvertedDocConsumer consumer;
final InvertedDocEndConsumer endConsumer;
- public DocInverter(InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+ final DocumentsWriterPerThread.DocState docState;
+
+ final FieldInvertState fieldState = new FieldInvertState();
+
+ final SingleTokenAttributeSource singleToken = new SingleTokenAttributeSource();
+
+ static class SingleTokenAttributeSource extends AttributeSource {
+ final CharTermAttribute termAttribute;
+ final OffsetAttribute offsetAttribute;
+
+ private SingleTokenAttributeSource() {
+ termAttribute = addAttribute(CharTermAttribute.class);
+ offsetAttribute = addAttribute(OffsetAttribute.class);
+ }
+
+ public void reinit(String stringValue, int startOffset, int endOffset) {
+ termAttribute.setEmpty().append(stringValue);
+ offsetAttribute.setOffset(startOffset, endOffset);
+ }
+ }
+
+ // Used to read a string value for a field
+ final ReusableStringReader stringReader = new ReusableStringReader();
+
+ public DocInverter(DocumentsWriterPerThread.DocState docState, InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+ this.docState = docState;
this.consumer = consumer;
this.endConsumer = endConsumer;
}
@@ -47,33 +73,37 @@ final class DocInverter extends DocField
}
@Override
- void flush(Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
- Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>> childThreadsAndFields = new HashMap<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>();
- Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> endChildThreadsAndFields = new HashMap<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>();
-
- for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet() ) {
+ void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
+ Map<FieldInfo, InvertedDocConsumerPerField> childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();
+ Map<FieldInfo, InvertedDocEndConsumerPerField> endChildFieldsToFlush = new HashMap<FieldInfo, InvertedDocEndConsumerPerField>();
- DocInverterPerThread perThread = (DocInverterPerThread) entry.getKey();
-
- Collection<InvertedDocConsumerPerField> childFields = new HashSet<InvertedDocConsumerPerField>();
- Collection<InvertedDocEndConsumerPerField> endChildFields = new HashSet<InvertedDocEndConsumerPerField>();
- for (final DocFieldConsumerPerField field: entry.getValue() ) {
- DocInverterPerField perField = (DocInverterPerField) field;
- childFields.add(perField.consumer);
- endChildFields.add(perField.endConsumer);
- }
-
- childThreadsAndFields.put(perThread.consumer, childFields);
- endChildThreadsAndFields.put(perThread.endConsumer, endChildFields);
+ for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
+ DocInverterPerField perField = (DocInverterPerField) fieldToFlush.getValue();
+ childFieldsToFlush.put(fieldToFlush.getKey(), perField.consumer);
+ endChildFieldsToFlush.put(fieldToFlush.getKey(), perField.endConsumer);
}
- consumer.flush(childThreadsAndFields, state);
- endConsumer.flush(endChildThreadsAndFields, state);
+ consumer.flush(childFieldsToFlush, state);
+ endConsumer.flush(endChildFieldsToFlush, state);
+ }
+
+ @Override
+ public void startDocument() throws IOException {
+ consumer.startDocument();
+ endConsumer.startDocument();
}
@Override
+ public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
+ // TODO: allow endConsumer.finishDocument to also return
+ // a DocWriter
+ endConsumer.finishDocument();
+ return consumer.finishDocument();
+ }
+
+
+ @Override
public void closeDocStore(SegmentWriteState state) throws IOException {
consumer.closeDocStore(state);
endConsumer.closeDocStore(state);
@@ -81,17 +111,21 @@ final class DocInverter extends DocField
@Override
void abort() {
- consumer.abort();
- endConsumer.abort();
+ try {
+ consumer.abort();
+ } finally {
+ endConsumer.abort();
+ }
}
@Override
public boolean freeRAM() {
return consumer.freeRAM();
}
-
+
@Override
- public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) {
- return new DocInverterPerThread(docFieldProcessorPerThread, this);
+ public DocFieldConsumerPerField addField(FieldInfo fi) {
+ return new DocInverterPerField(this, fi);
}
+
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java Wed Jul 21 10:27:20 2010
@@ -35,20 +35,20 @@ import org.apache.lucene.analysis.tokena
final class DocInverterPerField extends DocFieldConsumerPerField {
- final private DocInverterPerThread perThread;
- final private FieldInfo fieldInfo;
+ final private DocInverter parent;
+ final FieldInfo fieldInfo;
final InvertedDocConsumerPerField consumer;
final InvertedDocEndConsumerPerField endConsumer;
- final DocumentsWriter.DocState docState;
+ final DocumentsWriterPerThread.DocState docState;
final FieldInvertState fieldState;
- public DocInverterPerField(DocInverterPerThread perThread, FieldInfo fieldInfo) {
- this.perThread = perThread;
+ public DocInverterPerField(DocInverter parent, FieldInfo fieldInfo) {
+ this.parent = parent;
this.fieldInfo = fieldInfo;
- docState = perThread.docState;
- fieldState = perThread.fieldState;
- this.consumer = perThread.consumer.addField(this, fieldInfo);
- this.endConsumer = perThread.endConsumer.addField(this, fieldInfo);
+ docState = parent.docState;
+ fieldState = parent.fieldState;
+ this.consumer = parent.consumer.addField(this, fieldInfo);
+ this.endConsumer = parent.endConsumer.addField(this, fieldInfo);
}
@Override
@@ -84,8 +84,8 @@ final class DocInverterPerField extends
if (!field.isTokenized()) { // un-tokenized field
String stringValue = field.stringValue();
final int valueLength = stringValue.length();
- perThread.singleToken.reinit(stringValue, 0, valueLength);
- fieldState.attributeSource = perThread.singleToken;
+ parent.singleToken.reinit(stringValue, 0, valueLength);
+ fieldState.attributeSource = parent.singleToken;
consumer.start(field);
boolean success = false;
@@ -93,8 +93,9 @@ final class DocInverterPerField extends
consumer.add();
success = true;
} finally {
- if (!success)
+ if (!success) {
docState.docWriter.setAborting();
+ }
}
fieldState.offset += valueLength;
fieldState.length++;
@@ -119,8 +120,8 @@ final class DocInverterPerField extends
if (stringValue == null) {
throw new IllegalArgumentException("field must have either TokenStream, String or Reader value");
}
- perThread.stringReader.init(stringValue);
- reader = perThread.stringReader;
+ parent.stringReader.init(stringValue);
+ reader = parent.stringReader;
}
// Tokenize field and add to postingTable
@@ -173,8 +174,9 @@ final class DocInverterPerField extends
consumer.add();
success = true;
} finally {
- if (!success)
+ if (!success) {
docState.docWriter.setAborting();
+ }
}
fieldState.position++;
if (++fieldState.length >= maxFieldLength) {
@@ -208,4 +210,9 @@ final class DocInverterPerField extends
consumer.finish();
endConsumer.finish();
}
+
+ @Override
+ FieldInfo getFieldInfo() {
+ return this.fieldInfo;
+ }
}
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,459 @@
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.codecs.Codec;
+import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMFile;
+import org.apache.lucene.util.ArrayUtil;
+
+public class DocumentsWriterPerThread {
+
+ /**
+ * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
+ * which returns the DocConsumer that the DocumentsWriter calls to process the
+ * documents.
+ */
+ abstract static class IndexingChain {
+ abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
+ }
+
+
+ static final IndexingChain defaultIndexingChain = new IndexingChain() {
+
+ @Override
+ DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) {
+ /*
+ This is the current indexing chain:
+
+ DocConsumer / DocConsumerPerThread
+ --> code: DocFieldProcessor / DocFieldProcessorPerThread
+ --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
+ --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
+ --> code: DocInverter / DocInverterPerThread / DocInverterPerField
+ --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+ --> code: TermsHash / TermsHashPerThread / TermsHashPerField
+ --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
+ --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
+ --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
+ --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+ --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
+ --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
+ */
+
+ // Build up indexing chain:
+
+ final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriterPerThread);
+ final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
+
+ final InvertedDocConsumer termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter,
+ new TermsHash(documentsWriterPerThread, termVectorsWriter, null));
+ final NormsWriter normsWriter = new NormsWriter();
+ final DocInverter docInverter = new DocInverter(documentsWriterPerThread.docState, termsHash, normsWriter);
+ return new DocFieldProcessor(documentsWriterPerThread, docInverter);
+ }
+ };
+
+ static class DocState {
+ final DocumentsWriterPerThread docWriter;
+ Analyzer analyzer;
+ int maxFieldLength;
+ PrintStream infoStream;
+ Similarity similarity;
+ int docID;
+ Document doc;
+ String maxTermPrefix;
+
+ DocState(DocumentsWriterPerThread docWriter) {
+ this.docWriter = docWriter;
+ }
+
+ // Only called by asserts
+ public boolean testPoint(String name) {
+ return docWriter.writer.testPoint(name);
+ }
+ }
+
+ /** Called if we hit an exception at a bad time (when
+ * updating the index files) and must discard all
+ * currently buffered docs. This resets our state,
+ * discarding any docs added since last flush. */
+ void abort() throws IOException {
+ try {
+ if (infoStream != null) {
+ message("docWriter: now abort");
+ }
+ try {
+ consumer.abort();
+ } catch (Throwable t) {
+ }
+
+ docStoreSegment = null;
+ numDocsInStore = 0;
+ docStoreOffset = 0;
+
+ // Reset all postings data
+ doAfterFlush();
+
+ } finally {
+ aborting = false;
+ if (infoStream != null) {
+ message("docWriter: done abort");
+ }
+ }
+ }
+
+
+ final DocumentsWriterRAMAllocator ramAllocator = new DocumentsWriterRAMAllocator();
+
+ final DocumentsWriter parent;
+ final IndexWriter writer;
+
+ final Directory directory;
+ final DocState docState;
+ final DocConsumer consumer;
+ private DocFieldProcessor docFieldProcessor;
+
+ String segment; // Current segment we are working on
+ private String docStoreSegment; // Current doc-store segment we are writing
+ private int docStoreOffset; // Current starting doc-store offset of current segment
+ boolean aborting; // True if an abort is pending
+
+ private final PrintStream infoStream;
+ private int numDocsInRAM;
+ private int numDocsInStore;
+ private int flushedDocCount;
+ SegmentWriteState flushState;
+
+ long[] sequenceIDs = new long[8];
+
+ final List<String> closedFiles = new ArrayList<String>();
+
+ long numBytesUsed;
+
+ public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
+ this.directory = directory;
+ this.parent = parent;
+ this.writer = parent.indexWriter;
+ this.infoStream = parent.indexWriter.getInfoStream();
+ this.docState = new DocState(this);
+ this.docState.similarity = parent.config.getSimilarity();
+ this.docState.maxFieldLength = parent.config.getMaxFieldLength();
+
+ consumer = indexingChain.getChain(this);
+ if (consumer instanceof DocFieldProcessor) {
+ docFieldProcessor = (DocFieldProcessor) consumer;
+ }
+
+ }
+
+ void setAborting() {
+ aborting = true;
+ }
+
+ public void addDocument(Document doc, Analyzer analyzer) throws IOException {
+ docState.doc = doc;
+ docState.analyzer = analyzer;
+ docState.docID = numDocsInRAM;
+ initSegmentName(false);
+
+ final DocWriter perDoc;
+
+ boolean success = false;
+ try {
+ perDoc = consumer.processDocument();
+
+ success = true;
+ } finally {
+ if (!success) {
+ if (!aborting) {
+ // mark document as deleted
+ commitDocument(-1);
+ }
+ }
+ }
+
+ success = false;
+ try {
+ if (perDoc != null) {
+ perDoc.finish();
+ }
+
+ success = true;
+ } finally {
+ if (!success) {
+ setAborting();
+ }
+ }
+
+ }
+
+ public void commitDocument(long sequenceID) {
+ if (numDocsInRAM == sequenceIDs.length) {
+ sequenceIDs = ArrayUtil.grow(sequenceIDs);
+ }
+
+ sequenceIDs[numDocsInRAM] = sequenceID;
+ numDocsInRAM++;
+ numDocsInStore++;
+ }
+
+ int getNumDocsInRAM() {
+ return numDocsInRAM;
+ }
+
+ long getMinSequenceID() {
+ if (numDocsInRAM == 0) {
+ return -1;
+ }
+ return sequenceIDs[0];
+ }
+
+ /** Returns true if any of the fields in the current
+ * buffered docs have omitTermFreqAndPositions==false */
+ boolean hasProx() {
+ return (docFieldProcessor != null) ? docFieldProcessor.fieldInfos.hasProx()
+ : true;
+ }
+
+ Codec getCodec() {
+ return flushState.codec;
+ }
+
+ void initSegmentName(boolean onlyDocStore) {
+ if (segment == null && (!onlyDocStore || docStoreSegment == null)) {
+ // this call is synchronized on IndexWriter.segmentInfos
+ segment = writer.newSegmentName();
+ assert numDocsInRAM == 0;
+ }
+ if (docStoreSegment == null) {
+ docStoreSegment = segment;
+ assert numDocsInStore == 0;
+ }
+ }
+
+
+ private void initFlushState(boolean onlyDocStore) {
+ initSegmentName(onlyDocStore);
+ flushState = new SegmentWriteState(infoStream, directory, segment, docFieldProcessor.fieldInfos,
+ docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(),
+ writer.codecs);
+ }
+
+ /** Reset after a flush */
+ private void doAfterFlush() throws IOException {
+ segment = null;
+ numDocsInRAM = 0;
+ }
+
+ /** Flush all pending docs to a new segment */
+ SegmentInfo flush(boolean closeDocStore) throws IOException {
+ assert numDocsInRAM > 0;
+
+ initFlushState(closeDocStore);
+
+ docStoreOffset = numDocsInStore;
+
+ if (infoStream != null) {
+ message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
+ }
+
+ boolean success = false;
+
+ try {
+
+ if (closeDocStore) {
+ assert flushState.docStoreSegmentName != null;
+ assert flushState.docStoreSegmentName.equals(flushState.segmentName);
+ closeDocStore();
+ flushState.numDocsInStore = 0;
+ }
+
+ consumer.flush(flushState);
+
+ if (infoStream != null) {
+ SegmentInfo si = new SegmentInfo(flushState.segmentName,
+ flushState.numDocs,
+ directory, false,
+ docStoreOffset, flushState.docStoreSegmentName,
+ false,
+ hasProx(),
+ getCodec());
+
+ final long newSegmentSize = si.sizeInBytes();
+ String message = " ramUsed=" + ramAllocator.nf.format(((double) numBytesUsed)/1024./1024.) + " MB" +
+ " newFlushedSize=" + newSegmentSize +
+ " docs/MB=" + ramAllocator.nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
+ " new/old=" + ramAllocator.nf.format(100.0*newSegmentSize/numBytesUsed) + "%";
+ message(message);
+ }
+
+ flushedDocCount += flushState.numDocs;
+
+ long maxSequenceID = sequenceIDs[numDocsInRAM-1];
+ doAfterFlush();
+
+ // Create new SegmentInfo, but do not add to our
+ // segmentInfos until deletes are flushed
+ // successfully.
+ SegmentInfo newSegment = new SegmentInfo(flushState.segmentName,
+ flushState.numDocs,
+ directory, false,
+ docStoreOffset, flushState.docStoreSegmentName,
+ false,
+ hasProx(),
+ getCodec());
+
+
+ newSegment.setMinSequenceID(sequenceIDs[0]);
+ newSegment.setMaxSequenceID(maxSequenceID);
+
+ IndexWriter.setDiagnostics(newSegment, "flush");
+ success = true;
+
+ return newSegment;
+ } finally {
+ if (!success) {
+ setAborting();
+ }
+ }
+ }
+
+ /** Closes the current open doc stores an returns the doc
+ * store segment name. This returns null if there are *
+ * no buffered documents. */
+ String closeDocStore() throws IOException {
+
+ // nocommit
+// if (infoStream != null)
+// message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
+
+ boolean success = false;
+
+ try {
+ initFlushState(true);
+ closedFiles.clear();
+
+ consumer.closeDocStore(flushState);
+ // nocommit
+ //assert 0 == openFiles.size();
+
+ String s = docStoreSegment;
+ docStoreSegment = null;
+ docStoreOffset = 0;
+ numDocsInStore = 0;
+ success = true;
+ return s;
+ } finally {
+ if (!success) {
+ parent.abort();
+ }
+ }
+ }
+
+
+ /** Get current segment name we are writing. */
+ String getSegment() {
+ return segment;
+ }
+
+ /** Returns the current doc store segment we are writing
+ * to. */
+ String getDocStoreSegment() {
+ return docStoreSegment;
+ }
+
+ /** Returns the doc offset into the shared doc store for
+ * the current buffered docs. */
+ int getDocStoreOffset() {
+ return docStoreOffset;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ List<String> closedFiles() {
+ return (List<String>) ((ArrayList<String>) closedFiles).clone();
+ }
+
+ void addOpenFile(String name) {
+ synchronized(parent.openFiles) {
+ assert !parent.openFiles.contains(name);
+ parent.openFiles.add(name);
+ }
+ }
+
+ void removeOpenFile(String name) {
+ synchronized(parent.openFiles) {
+ assert parent.openFiles.contains(name);
+ parent.openFiles.remove(name);
+ }
+ closedFiles.add(name);
+ }
+
+ /** Consumer returns this on each doc. This holds any
+ * state that must be flushed synchronized "in docID
+ * order". We gather these and flush them in order. */
+ abstract static class DocWriter {
+ DocWriter next;
+ int docID;
+ abstract void finish() throws IOException;
+ abstract void abort();
+ abstract long sizeInBytes();
+
+ void setNext(DocWriter next) {
+ this.next = next;
+ }
+ }
+
+ /**
+ * Create and return a new DocWriterBuffer.
+ */
+ PerDocBuffer newPerDocBuffer() {
+ return new PerDocBuffer();
+ }
+
+ /**
+ * RAMFile buffer for DocWriters.
+ */
+ class PerDocBuffer extends RAMFile {
+
+ /**
+ * Allocate bytes used from shared pool.
+ */
+ protected byte[] newBuffer(int size) {
+ assert size == DocumentsWriterRAMAllocator.PER_DOC_BLOCK_SIZE;
+ return ramAllocator.perDocAllocator.getByteBlock();
+ }
+
+ /**
+ * Recycle the bytes used.
+ */
+ synchronized void recycle() {
+ if (buffers.size() > 0) {
+ setLength(0);
+
+ // Recycle the blocks
+ ramAllocator.perDocAllocator.recycleByteBlocks(buffers);
+ buffers.clear();
+ sizeInBytes = 0;
+
+ assert numBuffers() == 0;
+ }
+ }
+ }
+
+ void bytesUsed(long numBytes) {
+ ramAllocator.bytesUsed(numBytes);
+ }
+
+ void message(String message) {
+ if (infoStream != null)
+ writer.message("DW: " + message);
+ }
+}
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,148 @@
+package org.apache.lucene.index;
+
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.util.Constants;
+
+class DocumentsWriterRAMAllocator {
+ final ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
+ final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
+
+
+ class ByteBlockAllocator extends ByteBlockPool.Allocator {
+ final int blockSize;
+
+ ByteBlockAllocator(int blockSize) {
+ this.blockSize = blockSize;
+ }
+
+ ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
+
+ /* Allocate another byte[] from the shared pool */
+ @Override
+ byte[] getByteBlock() {
+ final int size = freeByteBlocks.size();
+ final byte[] b;
+ if (0 == size) {
+ b = new byte[blockSize];
+ // Always record a block allocated, even if
+ // trackAllocations is false. This is necessary
+ // because this block will be shared between
+ // things that don't track allocations (term
+ // vectors) and things that do (freq/prox
+ // postings).
+ numBytesUsed += blockSize;
+ } else
+ b = freeByteBlocks.remove(size-1);
+ return b;
+ }
+
+ /* Return byte[]'s to the pool */
+ @Override
+ void recycleByteBlocks(byte[][] blocks, int start, int end) {
+ for(int i=start;i<end;i++) {
+ freeByteBlocks.add(blocks[i]);
+ }
+ }
+
+ @Override
+ void recycleByteBlocks(List<byte[]> blocks) {
+ final int size = blocks.size();
+ for(int i=0;i<size;i++) {
+ freeByteBlocks.add(blocks.get(i));
+ }
+ }
+ }
+
+ private ArrayList<int[]> freeIntBlocks = new ArrayList<int[]>();
+
+ /* Allocate another int[] from the shared pool */
+ int[] getIntBlock() {
+ final int size = freeIntBlocks.size();
+ final int[] b;
+ if (0 == size) {
+ b = new int[INT_BLOCK_SIZE];
+ // Always record a block allocated, even if
+ // trackAllocations is false. This is necessary
+ // because this block will be shared between
+ // things that don't track allocations (term
+ // vectors) and things that do (freq/prox
+ // postings).
+ numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;
+ } else
+ b = freeIntBlocks.remove(size-1);
+ return b;
+ }
+
+ void bytesUsed(long numBytes) {
+ numBytesUsed += numBytes;
+ }
+
+ /* Return int[]s to the pool */
+ void recycleIntBlocks(int[][] blocks, int start, int end) {
+ for(int i=start;i<end;i++)
+ freeIntBlocks.add(blocks[i]);
+ }
+
+ long getRAMUsed() {
+ return numBytesUsed;
+ }
+
+ long numBytesUsed;
+
+ NumberFormat nf = NumberFormat.getInstance();
+
+ final static int PER_DOC_BLOCK_SIZE = 1024;
+
+ // Coarse estimates used to measure RAM usage of buffered deletes
+ final static int OBJECT_HEADER_BYTES = 8;
+ final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4;
+ final static int INT_NUM_BYTE = 4;
+ final static int CHAR_NUM_BYTE = 2;
+
+ /* Rough logic: HashMap has an array[Entry] w/ varying
+ load factor (say 2 * POINTER). Entry is object w/ Term
+ key, BufferedDeletes.Num val, int hash, Entry next
+ (OBJ_HEADER + 3*POINTER + INT). Term is object w/
+ String field and String text (OBJ_HEADER + 2*POINTER).
+ We don't count Term's field since it's interned.
+ Term's text is String (OBJ_HEADER + 4*INT + POINTER +
+ OBJ_HEADER + string.length*CHAR). BufferedDeletes.num is
+ OBJ_HEADER + INT. */
+
+ final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE;
+
+ /* Rough logic: del docIDs are List<Integer>. Say list
+ allocates ~2X size (2*POINTER). Integer is OBJ_HEADER
+ + int */
+ final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE;
+
+ /* Rough logic: HashMap has an array[Entry] w/ varying
+ load factor (say 2 * POINTER). Entry is object w/
+ Query key, Integer val, int hash, Entry next
+ (OBJ_HEADER + 3*POINTER + INT). Query we often
+ undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */
+ final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24;
+
+ /* Initial chunks size of the shared byte[] blocks used to
+ store postings data */
+ final static int BYTE_BLOCK_SHIFT = 15;
+ final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
+ final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
+ final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
+
+ final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
+
+ /* Initial chunks size of the shared int[] blocks used to
+ store postings data */
+ final static int INT_BLOCK_SHIFT = 13;
+ final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
+ final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
+
+ String toMB(long v) {
+ return nf.format(v/1024./1024.);
+ }
+
+}
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,255 @@
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+abstract class DocumentsWriterThreadPool {
+ public static abstract class Task<T> {
+ private boolean clearThreadBindings = false;
+
+ protected void clearThreadBindings() {
+ this.clearThreadBindings = true;
+ }
+
+ boolean doClearThreadBindings() {
+ return clearThreadBindings;
+ }
+ }
+
+ public static abstract class PerThreadTask<T> extends Task<T> {
+ abstract T process(final DocumentsWriterPerThread perThread) throws IOException;
+ }
+
+ public static abstract class AllThreadsTask<T> extends Task<T> {
+ abstract T process(final Iterator<DocumentsWriterPerThread> threadsIterator) throws IOException;
+ }
+
+ protected abstract static class ThreadState {
+ private DocumentsWriterPerThread perThread;
+ private boolean isIdle = true;
+
+ void start() {/* extension hook */}
+ void finish() {/* extension hook */}
+ }
+
+ private int pauseThreads = 0;
+
+ protected final int maxNumThreadStates;
+ protected ThreadState[] allThreadStates = new ThreadState[0];
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition threadStateAvailable = lock.newCondition();
+ private boolean globalLock;
+ private boolean aborting;
+
+ DocumentsWriterThreadPool(int maxNumThreadStates) {
+ this.maxNumThreadStates = (maxNumThreadStates < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumThreadStates;
+ }
+
+ public final int getMaxThreadStates() {
+ return this.maxNumThreadStates;
+ }
+
+ void pauseAllThreads() {
+ lock.lock();
+ try {
+ pauseThreads++;
+ while(!allThreadsIdle()) {
+ try {
+ threadStateAvailable.await();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void resumeAllThreads() {
+ lock.lock();
+ try {
+ pauseThreads--;
+ assert pauseThreads >= 0;
+ if (0 == pauseThreads) {
+ threadStateAvailable.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean allThreadsIdle() {
+ for (ThreadState state : allThreadStates) {
+ if (!state.isIdle) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ void abort() throws IOException {
+ pauseAllThreads();
+ aborting = true;
+ for (ThreadState state : allThreadStates) {
+ state.perThread.abort();
+ }
+ }
+
+ void finishAbort() {
+ aborting = false;
+ resumeAllThreads();
+ }
+
+ public <T> T executeAllThreads(AllThreadsTask<T> task) throws IOException {
+ T result = null;
+
+ lock.lock();
+ try {
+ try {
+ while (globalLock) {
+ threadStateAvailable.await();
+ }
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+
+ globalLock = true;
+ pauseAllThreads();
+ } finally {
+ lock.unlock();
+ }
+
+
+ // all threads are idle now
+
+ try {
+ final ThreadState[] localAllThreads = allThreadStates;
+
+ result = task.process(new Iterator<DocumentsWriterPerThread>() {
+ int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return i < localAllThreads.length;
+ }
+
+ @Override
+ public DocumentsWriterPerThread next() {
+ return localAllThreads[i++].perThread;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() not supported.");
+ }
+ });
+ return result;
+ } finally {
+ lock.lock();
+ try {
+ try {
+ if (task.doClearThreadBindings()) {
+ clearAllThreadBindings();
+ }
+ } finally {
+ globalLock = false;
+ resumeAllThreads();
+ threadStateAvailable.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ }
+ }
+
+
+ public final <T> T executePerThread(DocumentsWriter documentsWriter, Document doc, PerThreadTask<T> task) throws IOException {
+ ThreadState state = acquireThreadState(documentsWriter, doc);
+ boolean success = false;
+ try {
+ T result = task.process(state.perThread);
+ success = true;
+ return result;
+ } finally {
+ boolean abort = false;
+ if (!success && state.perThread.aborting) {
+ state.perThread.aborting = false;
+ abort = true;
+ }
+
+ returnDocumentsWriterPerThread(state, task.doClearThreadBindings());
+
+ if (abort) {
+ documentsWriter.abort();
+ }
+ }
+ }
+
+ protected final <T extends ThreadState> T addNewThreadState(DocumentsWriter documentsWriter, T threadState) {
+ // Just create a new "private" thread state
+ ThreadState[] newArray = new ThreadState[1+allThreadStates.length];
+ if (allThreadStates.length > 0)
+ System.arraycopy(allThreadStates, 0, newArray, 0, allThreadStates.length);
+ threadState.perThread = documentsWriter.newDocumentsWriterPerThread();
+ newArray[allThreadStates.length] = threadState;
+
+ allThreadStates = newArray;
+ return threadState;
+ }
+
+ protected abstract ThreadState selectThreadState(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
+ protected void clearThreadBindings(ThreadState flushedThread) {
+ // subclasses can optionally override this to cleanup after a thread flushed
+ }
+
+ protected void clearAllThreadBindings() {
+ // subclasses can optionally override this to cleanup after a thread flushed
+ }
+
+
+ private final ThreadState acquireThreadState(DocumentsWriter documentsWriter, Document doc) {
+ lock.lock();
+ try {
+ ThreadState threadState = selectThreadState(Thread.currentThread(), documentsWriter, doc);
+
+ try {
+ while (!threadState.isIdle || globalLock || aborting) {
+ threadStateAvailable.await();
+ }
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+
+ threadState.isIdle = false;
+ threadState.start();
+
+ return threadState;
+
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private final void returnDocumentsWriterPerThread(ThreadState state, boolean clearThreadBindings) {
+ lock.lock();
+ try {
+ state.finish();
+ if (clearThreadBindings) {
+ clearThreadBindings(state);
+ }
+ state.isIdle = true;
+ threadStateAvailable.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
Re: svn commit: r966168 [1/4] - in /lucene/dev: branches/realtime_search/lucene/src/java/org/apache/lucene/index/
branches/realtime_search/lucene/src/java/org/apache/lucene/util/ branches/realtime_search/lucene/src/test/org/apache/lucene/index/
trunk/lucen...
Posted by Michael Busch <bu...@gmail.com>.
Oups I messed something up. Two files ended up in trunk (something
went wrong with my svn switch) :(
Will try to fix it now...
Michael
On 7/21/10 3:27 AM, buschmi@apache.org wrote:
> Author: buschmi
> Date: Wed Jul 21 10:27:20 2010
> New Revision: 966168
>
> URL: http://svn.apache.org/viewvc?rev=966168&view=rev
> Log:
> LUCENE-2324: Committing second version of the patch to the real-time branch. It's not done yet, but easier to track progress using the branch.
>
> Added:
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/util/ThreadSafeCloneableSortedMap.java
> Removed:
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumerPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxFieldMergeState.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumerPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumerPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriterPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerThread.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumerPerThread.java
> lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashPerThread.java
> Modified:
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriter.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumer.java
> lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
> lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestByteSlices.java
> lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
> lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
> lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
> lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java
> lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
> lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java
> lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
> lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,70 @@
> +package org.apache.lucene.index;
> +
> +import java.util.TreeMap;
> +
> +import org.apache.lucene.search.Query;
> +import org.apache.lucene.util.ThreadSafeCloneableSortedMap;
> +
> +public class BufferedDeletesInRAM {
> + static class Delete {
> + int flushCount;
> +
> + public Delete(int flushCount) {
> + this.flushCount = flushCount;
> + }
> + }
> +
> + final static class DeleteTerm extends Delete {
> + final Term term;
> +
> + public DeleteTerm(Term term, int flushCount) {
> + super(flushCount);
> + this.term = term;
> + }
> + }
> +
> + final static class DeleteTerms extends Delete {
> + final Term[] terms;
> +
> + public DeleteTerms(Term[] terms, int flushCount) {
> + super(flushCount);
> + this.terms = terms;
> + }
> + }
> +
> + final static class DeleteQuery extends Delete {
> + final Query query;
> +
> + public DeleteQuery(Query query, int flushCount) {
> + super(flushCount);
> + this.query = query;
> + }
> + }
> +
> + final ThreadSafeCloneableSortedMap<Long, Delete> deletes = ThreadSafeCloneableSortedMap
> + .getThreadSafeSortedMap(new TreeMap<Long, Delete>());
> +
> + final void addDeleteTerm(Term term, long sequenceID, int numThreadStates) {
> + deletes.put(sequenceID, new DeleteTerm(term, numThreadStates));
> + }
> +
> + final void addDeleteTerms(Term[] terms, long sequenceID, int numThreadStates) {
> + deletes.put(sequenceID, new DeleteTerms(terms, numThreadStates));
> + }
> +
> + final void addDeleteQuery(Query query, long sequenceID, int numThreadStates) {
> + deletes.put(sequenceID, new DeleteQuery(query, numThreadStates));
> + }
> +
> + boolean hasDeletes() {
> + return !deletes.isEmpty();
> + }
> +
> + void clear() {
> + deletes.clear();
> + }
> +
> + int getNumDeletes() {
> + return this.deletes.size();
> + }
> +}
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java Wed Jul 21 10:27:20 2010
> @@ -50,10 +50,10 @@ final class ByteBlockPool {
> public byte[][] buffers = new byte[10][];
>
> int bufferUpto = -1; // Which buffer we are upto
> - public int byteUpto = DocumentsWriter.BYTE_BLOCK_SIZE; // Where we are in head buffer
> + public int byteUpto = DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE; // Where we are in head buffer
>
> public byte[] buffer; // Current head buffer
> - public int byteOffset = -DocumentsWriter.BYTE_BLOCK_SIZE; // Current head offset
> + public int byteOffset = -DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE; // Current head offset
>
> private final Allocator allocator;
>
> @@ -95,11 +95,11 @@ final class ByteBlockPool {
> bufferUpto++;
>
> byteUpto = 0;
> - byteOffset += DocumentsWriter.BYTE_BLOCK_SIZE;
> + byteOffset += DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
> }
>
> public int newSlice(final int size) {
> - if (byteUpto> DocumentsWriter.BYTE_BLOCK_SIZE-size)
> + if (byteUpto> DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-size)
> nextBuffer();
> final int upto = byteUpto;
> byteUpto += size;
> @@ -123,7 +123,7 @@ final class ByteBlockPool {
> final int newSize = levelSizeArray[newLevel];
>
> // Maybe allocate another block
> - if (byteUpto> DocumentsWriter.BYTE_BLOCK_SIZE-newSize)
> + if (byteUpto> DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-newSize)
> nextBuffer();
>
> final int newUpto = byteUpto;
> @@ -151,8 +151,8 @@ final class ByteBlockPool {
> // Fill in a BytesRef from term's length& bytes encoded in
> // byte block
> final BytesRef setBytesRef(BytesRef term, int textStart) {
> - final byte[] bytes = term.bytes = buffers[textStart>> DocumentsWriter.BYTE_BLOCK_SHIFT];
> - int pos = textStart& DocumentsWriter.BYTE_BLOCK_MASK;
> + final byte[] bytes = term.bytes = buffers[textStart>> DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
> + int pos = textStart& DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
> if ((bytes[pos]& 0x80) == 0) {
> // length is 1 byte
> term.length = bytes[pos];
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java Wed Jul 21 10:27:20 2010
> @@ -48,16 +48,16 @@ final class ByteSliceReader extends Data
> this.endIndex = endIndex;
>
> level = 0;
> - bufferUpto = startIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
> - bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
> + bufferUpto = startIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
> + bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
> buffer = pool.buffers[bufferUpto];
> - upto = startIndex& DocumentsWriter.BYTE_BLOCK_MASK;
> + upto = startIndex& DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>
> final int firstSize = ByteBlockPool.levelSizeArray[0];
>
> if (startIndex+firstSize>= endIndex) {
> // There is only this one slice to read
> - limit = endIndex& DocumentsWriter.BYTE_BLOCK_MASK;
> + limit = endIndex& DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
> } else
> limit = upto+firstSize-4;
> }
> @@ -102,11 +102,11 @@ final class ByteSliceReader extends Data
> level = ByteBlockPool.nextLevelArray[level];
> final int newSize = ByteBlockPool.levelSizeArray[level];
>
> - bufferUpto = nextIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
> - bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
> + bufferUpto = nextIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
> + bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
>
> buffer = pool.buffers[bufferUpto];
> - upto = nextIndex& DocumentsWriter.BYTE_BLOCK_MASK;
> + upto = nextIndex& DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>
> if (nextIndex + newSize>= endIndex) {
> // We are advancing to the final slice
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java Wed Jul 21 10:27:20 2010
> @@ -42,9 +42,9 @@ final class ByteSliceWriter extends Data
> * Set up the writer to write at address.
> */
> public void init(int address) {
> - slice = pool.buffers[address>> DocumentsWriter.BYTE_BLOCK_SHIFT];
> + slice = pool.buffers[address>> DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
> assert slice != null;
> - upto = address& DocumentsWriter.BYTE_BLOCK_MASK;
> + upto = address& DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
> offset0 = address;
> assert upto< slice.length;
> }
> @@ -80,6 +80,6 @@ final class ByteSliceWriter extends Data
> }
>
> public int getAddress() {
> - return upto + (offset0& DocumentsWriter.BYTE_BLOCK_NOT_MASK);
> + return upto + (offset0& DocumentsWriterRAMAllocator.BYTE_BLOCK_NOT_MASK);
> }
> }
> \ No newline at end of file
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java Wed Jul 21 10:27:20 2010
> @@ -18,11 +18,10 @@ package org.apache.lucene.index;
> */
>
> import java.io.IOException;
> -import java.util.Collection;
>
> abstract class DocConsumer {
> - abstract DocConsumerPerThread addThread(DocumentsWriterThreadState perThread) throws IOException;
> - abstract void flush(final Collection<DocConsumerPerThread> threads, final SegmentWriteState state) throws IOException;
> + abstract DocumentsWriterPerThread.DocWriter processDocument() throws IOException;
> + abstract void flush(final SegmentWriteState state) throws IOException;
> abstract void closeDocStore(final SegmentWriteState state) throws IOException;
> abstract void abort();
> abstract boolean freeRAM();
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java Wed Jul 21 10:27:20 2010
> @@ -18,7 +18,6 @@ package org.apache.lucene.index;
> */
>
> import java.io.IOException;
> -import java.util.Collection;
> import java.util.Map;
>
> abstract class DocFieldConsumer {
> @@ -27,7 +26,7 @@ abstract class DocFieldConsumer {
>
> /** Called when DocumentsWriter decides to create a new
> * segment */
> - abstract void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
> + abstract void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
>
> /** Called when DocumentsWriter decides to close the doc
> * stores */
> @@ -36,14 +35,17 @@ abstract class DocFieldConsumer {
> /** Called when an aborting exception is hit */
> abstract void abort();
>
> - /** Add a new thread */
> - abstract DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException;
> -
> /** Called when DocumentsWriter is using too much RAM.
> * The consumer should free RAM, if possible, returning
> * true if any RAM was in fact freed. */
> abstract boolean freeRAM();
> +
> + abstract void startDocument() throws IOException;
>
> + abstract DocFieldConsumerPerField addField(FieldInfo fi);
> +
> + abstract DocumentsWriterPerThread.DocWriter finishDocument() throws IOException;
> +
> void setFieldInfos(FieldInfos fieldInfos) {
> this.fieldInfos = fieldInfos;
> }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java Wed Jul 21 10:27:20 2010
> @@ -24,4 +24,5 @@ abstract class DocFieldConsumerPerField
> /** Processes all occurrences of a single field */
> abstract void processFields(Fieldable[] fields, int count) throws IOException;
> abstract void abort();
> + abstract FieldInfo getFieldInfo();
> }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java Wed Jul 21 10:27:20 2010
> @@ -17,12 +17,9 @@ package org.apache.lucene.index;
> * limitations under the License.
> */
>
> +import java.io.IOException;
> import java.util.HashMap;
> -import java.util.Collection;
> -import java.util.Iterator;
> import java.util.Map;
> -import java.util.HashSet;
> -import java.io.IOException;
>
> import org.apache.lucene.util.ArrayUtil;
> import org.apache.lucene.util.RamUsageEstimator;
> @@ -33,10 +30,12 @@ import org.apache.lucene.util.RamUsageEs
> final class DocFieldConsumers extends DocFieldConsumer {
> final DocFieldConsumer one;
> final DocFieldConsumer two;
> + final DocumentsWriterPerThread.DocState docState;
>
> - public DocFieldConsumers(DocFieldConsumer one, DocFieldConsumer two) {
> + public DocFieldConsumers(DocFieldProcessor processor, DocFieldConsumer one, DocFieldConsumer two) {
> this.one = one;
> this.two = two;
> + this.docState = processor.docState;
> }
>
> @Override
> @@ -47,33 +46,19 @@ final class DocFieldConsumers extends Do
> }
>
> @Override
> - public void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
> -
> - Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> oneThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
> - Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> twoThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
> -
> - for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet()) {
> + public void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
>
> - final DocFieldConsumersPerThread perThread = (DocFieldConsumersPerThread) entry.getKey();
> + Map<FieldInfo, DocFieldConsumerPerField> oneFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
> + Map<FieldInfo, DocFieldConsumerPerField> twoFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
>
> - final Collection<DocFieldConsumerPerField> fields = entry.getValue();
> -
> - Iterator<DocFieldConsumerPerField> fieldsIt = fields.iterator();
> - Collection<DocFieldConsumerPerField> oneFields = new HashSet<DocFieldConsumerPerField>();
> - Collection<DocFieldConsumerPerField> twoFields = new HashSet<DocFieldConsumerPerField>();
> - while(fieldsIt.hasNext()) {
> - DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldsIt.next();
> - oneFields.add(perField.one);
> - twoFields.add(perField.two);
> - }
> -
> - oneThreadsAndFields.put(perThread.one, oneFields);
> - twoThreadsAndFields.put(perThread.two, twoFields);
> + for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
> + DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldToFlush.getValue();
> + oneFieldsToFlush.put(fieldToFlush.getKey(), perField.one);
> + twoFieldsToFlush.put(fieldToFlush.getKey(), perField.two);
> }
> -
>
> - one.flush(oneThreadsAndFields, state);
> - two.flush(twoThreadsAndFields, state);
> + one.flush(oneFieldsToFlush, state);
> + two.flush(twoFieldsToFlush, state);
> }
>
> @Override
> @@ -101,16 +86,11 @@ final class DocFieldConsumers extends Do
> return any;
> }
>
> - @Override
> - public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException {
> - return new DocFieldConsumersPerThread(docFieldProcessorPerThread, this, one.addThread(docFieldProcessorPerThread), two.addThread(docFieldProcessorPerThread));
> - }
> -
> PerDoc[] docFreeList = new PerDoc[1];
> int freeCount;
> int allocCount;
>
> - synchronized PerDoc getPerDoc() {
> + PerDoc getPerDoc() {
> if (freeCount == 0) {
> allocCount++;
> if (allocCount> docFreeList.length) {
> @@ -125,15 +105,15 @@ final class DocFieldConsumers extends Do
> return docFreeList[--freeCount];
> }
>
> - synchronized void freePerDoc(PerDoc perDoc) {
> + void freePerDoc(PerDoc perDoc) {
> assert freeCount< docFreeList.length;
> docFreeList[freeCount++] = perDoc;
> }
>
> - class PerDoc extends DocumentsWriter.DocWriter {
> + class PerDoc extends DocumentsWriterPerThread.DocWriter {
>
> - DocumentsWriter.DocWriter writerOne;
> - DocumentsWriter.DocWriter writerTwo;
> + DocumentsWriterPerThread.DocWriter writerOne;
> + DocumentsWriterPerThread.DocWriter writerTwo;
>
> @Override
> public long sizeInBytes() {
> @@ -166,4 +146,35 @@ final class DocFieldConsumers extends Do
> }
> }
> }
> +
> + @Override
> + public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
> + final DocumentsWriterPerThread.DocWriter oneDoc = one.finishDocument();
> + final DocumentsWriterPerThread.DocWriter twoDoc = two.finishDocument();
> + if (oneDoc == null)
> + return twoDoc;
> + else if (twoDoc == null)
> + return oneDoc;
> + else {
> + DocFieldConsumers.PerDoc both = getPerDoc();
> + both.docID = docState.docID;
> + assert oneDoc.docID == docState.docID;
> + assert twoDoc.docID == docState.docID;
> + both.writerOne = oneDoc;
> + both.writerTwo = twoDoc;
> + return both;
> + }
> + }
> +
> + @Override
> + public void startDocument() throws IOException {
> + one.startDocument();
> + two.startDocument();
> + }
> +
> + @Override
> + public DocFieldConsumerPerField addField(FieldInfo fi) {
> + return new DocFieldConsumersPerField(this, fi, one.addField(fi), two.addField(fi));
> + }
> +
> }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java Wed Jul 21 10:27:20 2010
> @@ -24,12 +24,14 @@ final class DocFieldConsumersPerField ex
>
> final DocFieldConsumerPerField one;
> final DocFieldConsumerPerField two;
> - final DocFieldConsumersPerThread perThread;
> + final DocFieldConsumers parent;
> + final FieldInfo fieldInfo;
>
> - public DocFieldConsumersPerField(DocFieldConsumersPerThread perThread, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
> - this.perThread = perThread;
> + public DocFieldConsumersPerField(DocFieldConsumers parent, FieldInfo fi, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
> + this.parent = parent;
> this.one = one;
> this.two = two;
> + this.fieldInfo = fi;
> }
>
> @Override
> @@ -46,4 +48,9 @@ final class DocFieldConsumersPerField ex
> two.abort();
> }
> }
> +
> + @Override
> + FieldInfo getFieldInfo() {
> + return fieldInfo;
> + }
> }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Wed Jul 21 10:27:20 2010
> @@ -19,8 +19,15 @@ package org.apache.lucene.index;
>
> import java.io.IOException;
> import java.util.Collection;
> -import java.util.Map;
> import java.util.HashMap;
> +import java.util.HashSet;
> +import java.util.List;
> +import java.util.Map;
> +
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.document.Fieldable;
> +import org.apache.lucene.util.ArrayUtil;
> +import org.apache.lucene.util.RamUsageEstimator;
>
>
> /**
> @@ -33,13 +40,27 @@ import java.util.HashMap;
>
> final class DocFieldProcessor extends DocConsumer {
>
> - final DocumentsWriter docWriter;
> final FieldInfos fieldInfos = new FieldInfos();
> final DocFieldConsumer consumer;
> final StoredFieldsWriter fieldsWriter;
>
> - public DocFieldProcessor(DocumentsWriter docWriter, DocFieldConsumer consumer) {
> - this.docWriter = docWriter;
> + // Holds all fields seen in current doc
> + DocFieldProcessorPerField[] fields = new DocFieldProcessorPerField[1];
> + int fieldCount;
> +
> + // Hash table for all fields ever seen
> + DocFieldProcessorPerField[] fieldHash = new DocFieldProcessorPerField[2];
> + int hashMask = 1;
> + int totalFieldCount;
> +
> +
> + float docBoost;
> + int fieldGen;
> + final DocumentsWriterPerThread.DocState docState;
> +
> +
> + public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
> + this.docState = docWriter.docState;
> this.consumer = consumer;
> consumer.setFieldInfos(fieldInfos);
> fieldsWriter = new StoredFieldsWriter(docWriter, fieldInfos);
> @@ -52,16 +73,17 @@ final class DocFieldProcessor extends Do
> }
>
> @Override
> - public void flush(Collection<DocConsumerPerThread> threads, SegmentWriteState state) throws IOException {
> + public void flush(SegmentWriteState state) throws IOException {
>
> - Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>();
> - for ( DocConsumerPerThread thread : threads) {
> - DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
> - childThreadsAndFields.put(perThread.consumer, perThread.fields());
> - perThread.trimFields(state);
> + Map<FieldInfo, DocFieldConsumerPerField> childFields = new HashMap<FieldInfo, DocFieldConsumerPerField>();
> + Collection<DocFieldConsumerPerField> fields = fields();
> + for (DocFieldConsumerPerField f : fields) {
> + childFields.put(f.getFieldInfo(), f);
> }
> + trimFields(state);
> +
> fieldsWriter.flush(state);
> - consumer.flush(childThreadsAndFields, state);
> + consumer.flush(childFields, state);
>
> // Important to save after asking consumer to flush so
> // consumer can alter the FieldInfo* if necessary. EG,
> @@ -74,6 +96,15 @@ final class DocFieldProcessor extends Do
>
> @Override
> public void abort() {
> + for(int i=0;i<fieldHash.length;i++) {
> + DocFieldProcessorPerField field = fieldHash[i];
> + while(field != null) {
> + final DocFieldProcessorPerField next = field.next;
> + field.abort();
> + field = next;
> + }
> + }
> +
> fieldsWriter.abort();
> consumer.abort();
> }
> @@ -82,9 +113,317 @@ final class DocFieldProcessor extends Do
> public boolean freeRAM() {
> return consumer.freeRAM();
> }
> +
> + public Collection<DocFieldConsumerPerField> fields() {
> + Collection<DocFieldConsumerPerField> fields = new HashSet<DocFieldConsumerPerField>();
> + for(int i=0;i<fieldHash.length;i++) {
> + DocFieldProcessorPerField field = fieldHash[i];
> + while(field != null) {
> + fields.add(field.consumer);
> + field = field.next;
> + }
> + }
> + assert fields.size() == totalFieldCount;
> + return fields;
> + }
> +
> + /** If there are fields we've seen but did not see again
> + * in the last run, then free them up. */
> +
> + void trimFields(SegmentWriteState state) {
> +
> + for(int i=0;i<fieldHash.length;i++) {
> + DocFieldProcessorPerField perField = fieldHash[i];
> + DocFieldProcessorPerField lastPerField = null;
> +
> + while (perField != null) {
> +
> + if (perField.lastGen == -1) {
> +
> + // This field was not seen since the previous
> + // flush, so, free up its resources now
> +
> + // Unhash
> + if (lastPerField == null)
> + fieldHash[i] = perField.next;
> + else
> + lastPerField.next = perField.next;
> +
> + if (state.infoStream != null) {
> + state.infoStream.println(" purge field=" + perField.fieldInfo.name);
> + }
> +
> + totalFieldCount--;
> +
> + } else {
> + // Reset
> + perField.lastGen = -1;
> + lastPerField = perField;
> + }
> +
> + perField = perField.next;
> + }
> + }
> + }
> +
> + private void rehash() {
> + final int newHashSize = (fieldHash.length*2);
> + assert newHashSize> fieldHash.length;
> +
> + final DocFieldProcessorPerField newHashArray[] = new DocFieldProcessorPerField[newHashSize];
> +
> + // Rehash
> + int newHashMask = newHashSize-1;
> + for(int j=0;j<fieldHash.length;j++) {
> + DocFieldProcessorPerField fp0 = fieldHash[j];
> + while(fp0 != null) {
> + final int hashPos2 = fp0.fieldInfo.name.hashCode()& newHashMask;
> + DocFieldProcessorPerField nextFP0 = fp0.next;
> + fp0.next = newHashArray[hashPos2];
> + newHashArray[hashPos2] = fp0;
> + fp0 = nextFP0;
> + }
> + }
> +
> + fieldHash = newHashArray;
> + hashMask = newHashMask;
> + }
>
> @Override
> - public DocConsumerPerThread addThread(DocumentsWriterThreadState threadState) throws IOException {
> - return new DocFieldProcessorPerThread(threadState, this);
> + public DocumentsWriterPerThread.DocWriter processDocument() throws IOException {
> +
> + consumer.startDocument();
> + fieldsWriter.startDocument();
> +
> + final Document doc = docState.doc;
> +
> + fieldCount = 0;
> +
> + final int thisFieldGen = fieldGen++;
> +
> + final List<Fieldable> docFields = doc.getFields();
> + final int numDocFields = docFields.size();
> +
> + // Absorb any new fields first seen in this document.
> + // Also absorb any changes to fields we had already
> + // seen before (eg suddenly turning on norms or
> + // vectors, etc.):
> +
> + for(int i=0;i<numDocFields;i++) {
> + Fieldable field = docFields.get(i);
> + final String fieldName = field.name();
> +
> + // Make sure we have a PerField allocated
> + final int hashPos = fieldName.hashCode()& hashMask;
> + DocFieldProcessorPerField fp = fieldHash[hashPos];
> + while(fp != null&& !fp.fieldInfo.name.equals(fieldName)) {
> + fp = fp.next;
> + }
> +
> + if (fp == null) {
> +
> + // TODO FI: we need to genericize the "flags" that a
> + // field holds, and, how these flags are merged; it
> + // needs to be more "pluggable" such that if I want
> + // to have a new "thing" my Fields can do, I can
> + // easily add it
> + FieldInfo fi = fieldInfos.add(fieldName, field.isIndexed(), field.isTermVectorStored(),
> + field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
> + field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
> +
> + fp = new DocFieldProcessorPerField(this, fi);
> + fp.next = fieldHash[hashPos];
> + fieldHash[hashPos] = fp;
> + totalFieldCount++;
> +
> + if (totalFieldCount>= fieldHash.length/2)
> + rehash();
> + } else {
> + fp.fieldInfo.update(field.isIndexed(), field.isTermVectorStored(),
> + field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
> + field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
> + }
> +
> + if (thisFieldGen != fp.lastGen) {
> +
> + // First time we're seeing this field for this doc
> + fp.fieldCount = 0;
> +
> + if (fieldCount == fields.length) {
> + final int newSize = fields.length*2;
> + DocFieldProcessorPerField newArray[] = new DocFieldProcessorPerField[newSize];
> + System.arraycopy(fields, 0, newArray, 0, fieldCount);
> + fields = newArray;
> + }
> +
> + fields[fieldCount++] = fp;
> + fp.lastGen = thisFieldGen;
> + }
> +
> + if (fp.fieldCount == fp.fields.length) {
> + Fieldable[] newArray = new Fieldable[fp.fields.length*2];
> + System.arraycopy(fp.fields, 0, newArray, 0, fp.fieldCount);
> + fp.fields = newArray;
> + }
> +
> + fp.fields[fp.fieldCount++] = field;
> + if (field.isStored()) {
> + fieldsWriter.addField(field, fp.fieldInfo);
> + }
> + }
> +
> + // If we are writing vectors then we must visit
> + // fields in sorted order so they are written in
> + // sorted order. TODO: we actually only need to
> + // sort the subset of fields that have vectors
> + // enabled; we could save [small amount of] CPU
> + // here.
> + quickSort(fields, 0, fieldCount-1);
> +
> + for(int i=0;i<fieldCount;i++)
> + fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount);
> +
> + if (docState.maxTermPrefix != null&& docState.infoStream != null) {
> + docState.infoStream.println("WARNING: document contains at least one immense term (whose UTF8 encoding is longer than the max length " + DocumentsWriterRAMAllocator.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '" + docState.maxTermPrefix + "...'");
> + docState.maxTermPrefix = null;
> + }
> +
> + final DocumentsWriterPerThread.DocWriter one = fieldsWriter.finishDocument();
> + final DocumentsWriterPerThread.DocWriter two = consumer.finishDocument();
> + if (one == null) {
> + return two;
> + } else if (two == null) {
> + return one;
> + } else {
> + PerDoc both = getPerDoc();
> + both.docID = docState.docID;
> + assert one.docID == docState.docID;
> + assert two.docID == docState.docID;
> + both.one = one;
> + both.two = two;
> + return both;
> + }
> + }
> +
> + void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) {
> + if (lo>= hi)
> + return;
> + else if (hi == 1+lo) {
> + if (array[lo].fieldInfo.name.compareTo(array[hi].fieldInfo.name)> 0) {
> + final DocFieldProcessorPerField tmp = array[lo];
> + array[lo] = array[hi];
> + array[hi] = tmp;
> + }
> + return;
> + }
> +
> + int mid = (lo + hi)>>> 1;
> +
> + if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name)> 0) {
> + DocFieldProcessorPerField tmp = array[lo];
> + array[lo] = array[mid];
> + array[mid] = tmp;
> + }
> +
> + if (array[mid].fieldInfo.name.compareTo(array[hi].fieldInfo.name)> 0) {
> + DocFieldProcessorPerField tmp = array[mid];
> + array[mid] = array[hi];
> + array[hi] = tmp;
> +
> + if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name)> 0) {
> + DocFieldProcessorPerField tmp2 = array[lo];
> + array[lo] = array[mid];
> + array[mid] = tmp2;
> + }
> + }
> +
> + int left = lo + 1;
> + int right = hi - 1;
> +
> + if (left>= right)
> + return;
> +
> + DocFieldProcessorPerField partition = array[mid];
> +
> + for (; ;) {
> + while (array[right].fieldInfo.name.compareTo(partition.fieldInfo.name)> 0)
> + --right;
> +
> + while (left< right&& array[left].fieldInfo.name.compareTo(partition.fieldInfo.name)<= 0)
> + ++left;
> +
> + if (left< right) {
> + DocFieldProcessorPerField tmp = array[left];
> + array[left] = array[right];
> + array[right] = tmp;
> + --right;
> + } else {
> + break;
> + }
> + }
> +
> + quickSort(array, lo, left);
> + quickSort(array, left + 1, hi);
> + }
> +
> + PerDoc[] docFreeList = new PerDoc[1];
> + int freeCount;
> + int allocCount;
> +
> + PerDoc getPerDoc() {
> + if (freeCount == 0) {
> + allocCount++;
> + if (allocCount> docFreeList.length) {
> + // Grow our free list up front to make sure we have
> + // enough space to recycle all outstanding PerDoc
> + // instances
> + assert allocCount == 1+docFreeList.length;
> + docFreeList = new PerDoc[ArrayUtil.oversize(allocCount, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
> + }
> + return new PerDoc();
> + } else
> + return docFreeList[--freeCount];
> + }
> +
> + void freePerDoc(PerDoc perDoc) {
> + assert freeCount< docFreeList.length;
> + docFreeList[freeCount++] = perDoc;
> + }
> +
> + class PerDoc extends DocumentsWriterPerThread.DocWriter {
> +
> + DocumentsWriterPerThread.DocWriter one;
> + DocumentsWriterPerThread.DocWriter two;
> +
> + @Override
> + public long sizeInBytes() {
> + return one.sizeInBytes() + two.sizeInBytes();
> + }
> +
> + @Override
> + public void finish() throws IOException {
> + try {
> + try {
> + one.finish();
> + } finally {
> + two.finish();
> + }
> + } finally {
> + freePerDoc(this);
> + }
> + }
> +
> + @Override
> + public void abort() {
> + try {
> + try {
> + one.abort();
> + } finally {
> + two.abort();
> + }
> + } finally {
> + freePerDoc(this);
> + }
> + }
> }
> }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java Wed Jul 21 10:27:20 2010
> @@ -34,8 +34,8 @@ final class DocFieldProcessorPerField {
> int fieldCount;
> Fieldable[] fields = new Fieldable[1];
>
> - public DocFieldProcessorPerField(final DocFieldProcessorPerThread perThread, final FieldInfo fieldInfo) {
> - this.consumer = perThread.consumer.addField(fieldInfo);
> + public DocFieldProcessorPerField(final DocFieldProcessor docFieldProcessor, final FieldInfo fieldInfo) {
> + this.consumer = docFieldProcessor.consumer.addField(fieldInfo);
> this.fieldInfo = fieldInfo;
> }
>
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java Wed Jul 21 10:27:20 2010
> @@ -18,12 +18,13 @@ package org.apache.lucene.index;
> */
>
> import java.io.IOException;
> -import java.util.Collection;
> import java.util.HashMap;
> -import java.util.HashSet;
> -
> import java.util.Map;
>
> +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
> +import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
> +import org.apache.lucene.util.AttributeSource;
> +
>
> /** This is a DocFieldConsumer that inverts each field,
> * separately, from a Document, and accepts a
> @@ -34,7 +35,32 @@ final class DocInverter extends DocField
> final InvertedDocConsumer consumer;
> final InvertedDocEndConsumer endConsumer;
>
> - public DocInverter(InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
> + final DocumentsWriterPerThread.DocState docState;
> +
> + final FieldInvertState fieldState = new FieldInvertState();
> +
> + final SingleTokenAttributeSource singleToken = new SingleTokenAttributeSource();
> +
> + static class SingleTokenAttributeSource extends AttributeSource {
> + final CharTermAttribute termAttribute;
> + final OffsetAttribute offsetAttribute;
> +
> + private SingleTokenAttributeSource() {
> + termAttribute = addAttribute(CharTermAttribute.class);
> + offsetAttribute = addAttribute(OffsetAttribute.class);
> + }
> +
> + public void reinit(String stringValue, int startOffset, int endOffset) {
> + termAttribute.setEmpty().append(stringValue);
> + offsetAttribute.setOffset(startOffset, endOffset);
> + }
> + }
> +
> + // Used to read a string value for a field
> + final ReusableStringReader stringReader = new ReusableStringReader();
> +
> + public DocInverter(DocumentsWriterPerThread.DocState docState, InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
> + this.docState = docState;
> this.consumer = consumer;
> this.endConsumer = endConsumer;
> }
> @@ -47,33 +73,37 @@ final class DocInverter extends DocField
> }
>
> @Override
> - void flush(Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
> -
> - Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>> childThreadsAndFields = new HashMap<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>();
> - Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> endChildThreadsAndFields = new HashMap<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>();
> -
> - for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet() ) {
> + void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
>
> + Map<FieldInfo, InvertedDocConsumerPerField> childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();
> + Map<FieldInfo, InvertedDocEndConsumerPerField> endChildFieldsToFlush = new HashMap<FieldInfo, InvertedDocEndConsumerPerField>();
>
> - DocInverterPerThread perThread = (DocInverterPerThread) entry.getKey();
> -
> - Collection<InvertedDocConsumerPerField> childFields = new HashSet<InvertedDocConsumerPerField>();
> - Collection<InvertedDocEndConsumerPerField> endChildFields = new HashSet<InvertedDocEndConsumerPerField>();
> - for (final DocFieldConsumerPerField field: entry.getValue() ) {
> - DocInverterPerField perField = (DocInverterPerField) field;
> - childFields.add(perField.consumer);
> - endChildFields.add(perField.endConsumer);
> - }
> -
> - childThreadsAndFields.put(perThread.consumer, childFields);
> - endChildThreadsAndFields.put(perThread.endConsumer, endChildFields);
> + for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
> + DocInverterPerField perField = (DocInverterPerField) fieldToFlush.getValue();
> + childFieldsToFlush.put(fieldToFlush.getKey(), perField.consumer);
> + endChildFieldsToFlush.put(fieldToFlush.getKey(), perField.endConsumer);
> }
>
> - consumer.flush(childThreadsAndFields, state);
> - endConsumer.flush(endChildThreadsAndFields, state);
> + consumer.flush(childFieldsToFlush, state);
> + endConsumer.flush(endChildFieldsToFlush, state);
> + }
> +
> + @Override
> + public void startDocument() throws IOException {
> + consumer.startDocument();
> + endConsumer.startDocument();
> }
>
> @Override
> + public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
> + // TODO: allow endConsumer.finishDocument to also return
> + // a DocWriter
> + endConsumer.finishDocument();
> + return consumer.finishDocument();
> + }
> +
> +
> + @Override
> public void closeDocStore(SegmentWriteState state) throws IOException {
> consumer.closeDocStore(state);
> endConsumer.closeDocStore(state);
> @@ -81,17 +111,21 @@ final class DocInverter extends DocField
>
> @Override
> void abort() {
> - consumer.abort();
> - endConsumer.abort();
> + try {
> + consumer.abort();
> + } finally {
> + endConsumer.abort();
> + }
> }
>
> @Override
> public boolean freeRAM() {
> return consumer.freeRAM();
> }
> -
> +
> @Override
> - public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) {
> - return new DocInverterPerThread(docFieldProcessorPerThread, this);
> + public DocFieldConsumerPerField addField(FieldInfo fi) {
> + return new DocInverterPerField(this, fi);
> }
> +
> }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java Wed Jul 21 10:27:20 2010
> @@ -35,20 +35,20 @@ import org.apache.lucene.analysis.tokena
>
> final class DocInverterPerField extends DocFieldConsumerPerField {
>
> - final private DocInverterPerThread perThread;
> - final private FieldInfo fieldInfo;
> + final private DocInverter parent;
> + final FieldInfo fieldInfo;
> final InvertedDocConsumerPerField consumer;
> final InvertedDocEndConsumerPerField endConsumer;
> - final DocumentsWriter.DocState docState;
> + final DocumentsWriterPerThread.DocState docState;
> final FieldInvertState fieldState;
>
> - public DocInverterPerField(DocInverterPerThread perThread, FieldInfo fieldInfo) {
> - this.perThread = perThread;
> + public DocInverterPerField(DocInverter parent, FieldInfo fieldInfo) {
> + this.parent = parent;
> this.fieldInfo = fieldInfo;
> - docState = perThread.docState;
> - fieldState = perThread.fieldState;
> - this.consumer = perThread.consumer.addField(this, fieldInfo);
> - this.endConsumer = perThread.endConsumer.addField(this, fieldInfo);
> + docState = parent.docState;
> + fieldState = parent.fieldState;
> + this.consumer = parent.consumer.addField(this, fieldInfo);
> + this.endConsumer = parent.endConsumer.addField(this, fieldInfo);
> }
>
> @Override
> @@ -84,8 +84,8 @@ final class DocInverterPerField extends
> if (!field.isTokenized()) { // un-tokenized field
> String stringValue = field.stringValue();
> final int valueLength = stringValue.length();
> - perThread.singleToken.reinit(stringValue, 0, valueLength);
> - fieldState.attributeSource = perThread.singleToken;
> + parent.singleToken.reinit(stringValue, 0, valueLength);
> + fieldState.attributeSource = parent.singleToken;
> consumer.start(field);
>
> boolean success = false;
> @@ -93,8 +93,9 @@ final class DocInverterPerField extends
> consumer.add();
> success = true;
> } finally {
> - if (!success)
> + if (!success) {
> docState.docWriter.setAborting();
> + }
> }
> fieldState.offset += valueLength;
> fieldState.length++;
> @@ -119,8 +120,8 @@ final class DocInverterPerField extends
> if (stringValue == null) {
> throw new IllegalArgumentException("field must have either TokenStream, String or Reader value");
> }
> - perThread.stringReader.init(stringValue);
> - reader = perThread.stringReader;
> + parent.stringReader.init(stringValue);
> + reader = parent.stringReader;
> }
>
> // Tokenize field and add to postingTable
> @@ -173,8 +174,9 @@ final class DocInverterPerField extends
> consumer.add();
> success = true;
> } finally {
> - if (!success)
> + if (!success) {
> docState.docWriter.setAborting();
> + }
> }
> fieldState.position++;
> if (++fieldState.length>= maxFieldLength) {
> @@ -208,4 +210,9 @@ final class DocInverterPerField extends
> consumer.finish();
> endConsumer.finish();
> }
> +
> + @Override
> + FieldInfo getFieldInfo() {
> + return this.fieldInfo;
> + }
> }
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,459 @@
> +package org.apache.lucene.index;
> +
> +import java.io.IOException;
> +import java.io.PrintStream;
> +import java.util.ArrayList;
> +import java.util.List;
> +
> +import org.apache.lucene.analysis.Analyzer;
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.index.codecs.Codec;
> +import org.apache.lucene.search.Similarity;
> +import org.apache.lucene.store.Directory;
> +import org.apache.lucene.store.RAMFile;
> +import org.apache.lucene.util.ArrayUtil;
> +
> +public class DocumentsWriterPerThread {
> +
> + /**
> + * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
> + * which returns the DocConsumer that the DocumentsWriter calls to process the
> + * documents.
> + */
> + abstract static class IndexingChain {
> + abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
> + }
> +
> +
> + static final IndexingChain defaultIndexingChain = new IndexingChain() {
> +
> + @Override
> + DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) {
> + /*
> + This is the current indexing chain:
> +
> + DocConsumer / DocConsumerPerThread
> + --> code: DocFieldProcessor / DocFieldProcessorPerThread
> + --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
> + --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
> + --> code: DocInverter / DocInverterPerThread / DocInverterPerField
> + --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
> + --> code: TermsHash / TermsHashPerThread / TermsHashPerField
> + --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
> + --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
> + --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
> + --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
> + --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
> + --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
> + */
> +
> + // Build up indexing chain:
> +
> + final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriterPerThread);
> + final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
> +
> + final InvertedDocConsumer termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter,
> + new TermsHash(documentsWriterPerThread, termVectorsWriter, null));
> + final NormsWriter normsWriter = new NormsWriter();
> + final DocInverter docInverter = new DocInverter(documentsWriterPerThread.docState, termsHash, normsWriter);
> + return new DocFieldProcessor(documentsWriterPerThread, docInverter);
> + }
> + };
> +
> + static class DocState {
> + final DocumentsWriterPerThread docWriter;
> + Analyzer analyzer;
> + int maxFieldLength;
> + PrintStream infoStream;
> + Similarity similarity;
> + int docID;
> + Document doc;
> + String maxTermPrefix;
> +
> + DocState(DocumentsWriterPerThread docWriter) {
> + this.docWriter = docWriter;
> + }
> +
> + // Only called by asserts
> + public boolean testPoint(String name) {
> + return docWriter.writer.testPoint(name);
> + }
> + }
> +
> + /** Called if we hit an exception at a bad time (when
> + * updating the index files) and must discard all
> + * currently buffered docs. This resets our state,
> + * discarding any docs added since last flush. */
> + void abort() throws IOException {
> + try {
> + if (infoStream != null) {
> + message("docWriter: now abort");
> + }
> + try {
> + consumer.abort();
> + } catch (Throwable t) {
> + }
> +
> + docStoreSegment = null;
> + numDocsInStore = 0;
> + docStoreOffset = 0;
> +
> + // Reset all postings data
> + doAfterFlush();
> +
> + } finally {
> + aborting = false;
> + if (infoStream != null) {
> + message("docWriter: done abort");
> + }
> + }
> + }
> +
> +
> + final DocumentsWriterRAMAllocator ramAllocator = new DocumentsWriterRAMAllocator();
> +
> + final DocumentsWriter parent;
> + final IndexWriter writer;
> +
> + final Directory directory;
> + final DocState docState;
> + final DocConsumer consumer;
> + private DocFieldProcessor docFieldProcessor;
> +
> + String segment; // Current segment we are working on
> + private String docStoreSegment; // Current doc-store segment we are writing
> + private int docStoreOffset; // Current starting doc-store offset of current segment
> + boolean aborting; // True if an abort is pending
> +
> + private final PrintStream infoStream;
> + private int numDocsInRAM;
> + private int numDocsInStore;
> + private int flushedDocCount;
> + SegmentWriteState flushState;
> +
> + long[] sequenceIDs = new long[8];
> +
> + final List<String> closedFiles = new ArrayList<String>();
> +
> + long numBytesUsed;
> +
> + public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
> + this.directory = directory;
> + this.parent = parent;
> + this.writer = parent.indexWriter;
> + this.infoStream = parent.indexWriter.getInfoStream();
> + this.docState = new DocState(this);
> + this.docState.similarity = parent.config.getSimilarity();
> + this.docState.maxFieldLength = parent.config.getMaxFieldLength();
> +
> + consumer = indexingChain.getChain(this);
> + if (consumer instanceof DocFieldProcessor) {
> + docFieldProcessor = (DocFieldProcessor) consumer;
> + }
> +
> + }
> +
> + void setAborting() {
> + aborting = true;
> + }
> +
> + public void addDocument(Document doc, Analyzer analyzer) throws IOException {
> + docState.doc = doc;
> + docState.analyzer = analyzer;
> + docState.docID = numDocsInRAM;
> + initSegmentName(false);
> +
> + final DocWriter perDoc;
> +
> + boolean success = false;
> + try {
> + perDoc = consumer.processDocument();
> +
> + success = true;
> + } finally {
> + if (!success) {
> + if (!aborting) {
> + // mark document as deleted
> + commitDocument(-1);
> + }
> + }
> + }
> +
> + success = false;
> + try {
> + if (perDoc != null) {
> + perDoc.finish();
> + }
> +
> + success = true;
> + } finally {
> + if (!success) {
> + setAborting();
> + }
> + }
> +
> + }
> +
> + public void commitDocument(long sequenceID) {
> + if (numDocsInRAM == sequenceIDs.length) {
> + sequenceIDs = ArrayUtil.grow(sequenceIDs);
> + }
> +
> + sequenceIDs[numDocsInRAM] = sequenceID;
> + numDocsInRAM++;
> + numDocsInStore++;
> + }
> +
> + int getNumDocsInRAM() {
> + return numDocsInRAM;
> + }
> +
> + long getMinSequenceID() {
> + if (numDocsInRAM == 0) {
> + return -1;
> + }
> + return sequenceIDs[0];
> + }
> +
> + /** Returns true if any of the fields in the current
> + * buffered docs have omitTermFreqAndPositions==false */
> + boolean hasProx() {
> + return (docFieldProcessor != null) ? docFieldProcessor.fieldInfos.hasProx()
> + : true;
> + }
> +
> + Codec getCodec() {
> + return flushState.codec;
> + }
> +
> + void initSegmentName(boolean onlyDocStore) {
> + if (segment == null&& (!onlyDocStore || docStoreSegment == null)) {
> + // this call is synchronized on IndexWriter.segmentInfos
> + segment = writer.newSegmentName();
> + assert numDocsInRAM == 0;
> + }
> + if (docStoreSegment == null) {
> + docStoreSegment = segment;
> + assert numDocsInStore == 0;
> + }
> + }
> +
> +
> + private void initFlushState(boolean onlyDocStore) {
> + initSegmentName(onlyDocStore);
> + flushState = new SegmentWriteState(infoStream, directory, segment, docFieldProcessor.fieldInfos,
> + docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(),
> + writer.codecs);
> + }
> +
> + /** Reset after a flush */
> + private void doAfterFlush() throws IOException {
> + segment = null;
> + numDocsInRAM = 0;
> + }
> +
> + /** Flush all pending docs to a new segment */
> + SegmentInfo flush(boolean closeDocStore) throws IOException {
> + assert numDocsInRAM> 0;
> +
> + initFlushState(closeDocStore);
> +
> + docStoreOffset = numDocsInStore;
> +
> + if (infoStream != null) {
> + message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
> + }
> +
> + boolean success = false;
> +
> + try {
> +
> + if (closeDocStore) {
> + assert flushState.docStoreSegmentName != null;
> + assert flushState.docStoreSegmentName.equals(flushState.segmentName);
> + closeDocStore();
> + flushState.numDocsInStore = 0;
> + }
> +
> + consumer.flush(flushState);
> +
> + if (infoStream != null) {
> + SegmentInfo si = new SegmentInfo(flushState.segmentName,
> + flushState.numDocs,
> + directory, false,
> + docStoreOffset, flushState.docStoreSegmentName,
> + false,
> + hasProx(),
> + getCodec());
> +
> + final long newSegmentSize = si.sizeInBytes();
> + String message = " ramUsed=" + ramAllocator.nf.format(((double) numBytesUsed)/1024./1024.) + " MB" +
> + " newFlushedSize=" + newSegmentSize +
> + " docs/MB=" + ramAllocator.nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
> + " new/old=" + ramAllocator.nf.format(100.0*newSegmentSize/numBytesUsed) + "%";
> + message(message);
> + }
> +
> + flushedDocCount += flushState.numDocs;
> +
> + long maxSequenceID = sequenceIDs[numDocsInRAM-1];
> + doAfterFlush();
> +
> + // Create new SegmentInfo, but do not add to our
> + // segmentInfos until deletes are flushed
> + // successfully.
> + SegmentInfo newSegment = new SegmentInfo(flushState.segmentName,
> + flushState.numDocs,
> + directory, false,
> + docStoreOffset, flushState.docStoreSegmentName,
> + false,
> + hasProx(),
> + getCodec());
> +
> +
> + newSegment.setMinSequenceID(sequenceIDs[0]);
> + newSegment.setMaxSequenceID(maxSequenceID);
> +
> + IndexWriter.setDiagnostics(newSegment, "flush");
> + success = true;
> +
> + return newSegment;
> + } finally {
> + if (!success) {
> + setAborting();
> + }
> + }
> + }
> +
> + /** Closes the current open doc stores an returns the doc
> + * store segment name. This returns null if there are *
> + * no buffered documents. */
> + String closeDocStore() throws IOException {
> +
> + // nocommit
> +// if (infoStream != null)
> +// message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
> +
> + boolean success = false;
> +
> + try {
> + initFlushState(true);
> + closedFiles.clear();
> +
> + consumer.closeDocStore(flushState);
> + // nocommit
> + //assert 0 == openFiles.size();
> +
> + String s = docStoreSegment;
> + docStoreSegment = null;
> + docStoreOffset = 0;
> + numDocsInStore = 0;
> + success = true;
> + return s;
> + } finally {
> + if (!success) {
> + parent.abort();
> + }
> + }
> + }
> +
> +
> + /** Get current segment name we are writing. */
> + String getSegment() {
> + return segment;
> + }
> +
> + /** Returns the current doc store segment we are writing
> + * to. */
> + String getDocStoreSegment() {
> + return docStoreSegment;
> + }
> +
> + /** Returns the doc offset into the shared doc store for
> + * the current buffered docs. */
> + int getDocStoreOffset() {
> + return docStoreOffset;
> + }
> +
> +
> + @SuppressWarnings("unchecked")
> + List<String> closedFiles() {
> + return (List<String>) ((ArrayList<String>) closedFiles).clone();
> + }
> +
> + void addOpenFile(String name) {
> + synchronized(parent.openFiles) {
> + assert !parent.openFiles.contains(name);
> + parent.openFiles.add(name);
> + }
> + }
> +
> + void removeOpenFile(String name) {
> + synchronized(parent.openFiles) {
> + assert parent.openFiles.contains(name);
> + parent.openFiles.remove(name);
> + }
> + closedFiles.add(name);
> + }
> +
> + /** Consumer returns this on each doc. This holds any
> + * state that must be flushed synchronized "in docID
> + * order". We gather these and flush them in order. */
> + abstract static class DocWriter {
> + DocWriter next;
> + int docID;
> + abstract void finish() throws IOException;
> + abstract void abort();
> + abstract long sizeInBytes();
> +
> + void setNext(DocWriter next) {
> + this.next = next;
> + }
> + }
> +
> + /**
> + * Create and return a new DocWriterBuffer.
> + */
> + PerDocBuffer newPerDocBuffer() {
> + return new PerDocBuffer();
> + }
> +
> + /**
> + * RAMFile buffer for DocWriters.
> + */
> + class PerDocBuffer extends RAMFile {
> +
> + /**
> + * Allocate bytes used from shared pool.
> + */
> + protected byte[] newBuffer(int size) {
> + assert size == DocumentsWriterRAMAllocator.PER_DOC_BLOCK_SIZE;
> + return ramAllocator.perDocAllocator.getByteBlock();
> + }
> +
> + /**
> + * Recycle the bytes used.
> + */
> + synchronized void recycle() {
> + if (buffers.size()> 0) {
> + setLength(0);
> +
> + // Recycle the blocks
> + ramAllocator.perDocAllocator.recycleByteBlocks(buffers);
> + buffers.clear();
> + sizeInBytes = 0;
> +
> + assert numBuffers() == 0;
> + }
> + }
> + }
> +
> + void bytesUsed(long numBytes) {
> + ramAllocator.bytesUsed(numBytes);
> + }
> +
> + void message(String message) {
> + if (infoStream != null)
> + writer.message("DW: " + message);
> + }
> +}
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,148 @@
> +package org.apache.lucene.index;
> +
> +import java.text.NumberFormat;
> +import java.util.ArrayList;
> +import java.util.List;
> +
> +import org.apache.lucene.util.Constants;
> +
> +class DocumentsWriterRAMAllocator {
> + final ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
> + final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
> +
> +
> + class ByteBlockAllocator extends ByteBlockPool.Allocator {
> + final int blockSize;
> +
> + ByteBlockAllocator(int blockSize) {
> + this.blockSize = blockSize;
> + }
> +
> + ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
> +
> + /* Allocate another byte[] from the shared pool */
> + @Override
> + byte[] getByteBlock() {
> + final int size = freeByteBlocks.size();
> + final byte[] b;
> + if (0 == size) {
> + b = new byte[blockSize];
> + // Always record a block allocated, even if
> + // trackAllocations is false. This is necessary
> + // because this block will be shared between
> + // things that don't track allocations (term
> + // vectors) and things that do (freq/prox
> + // postings).
> + numBytesUsed += blockSize;
> + } else
> + b = freeByteBlocks.remove(size-1);
> + return b;
> + }
> +
> + /* Return byte[]'s to the pool */
> + @Override
> + void recycleByteBlocks(byte[][] blocks, int start, int end) {
> + for(int i=start;i<end;i++) {
> + freeByteBlocks.add(blocks[i]);
> + }
> + }
> +
> + @Override
> + void recycleByteBlocks(List<byte[]> blocks) {
> + final int size = blocks.size();
> + for(int i=0;i<size;i++) {
> + freeByteBlocks.add(blocks.get(i));
> + }
> + }
> + }
> +
> + private ArrayList<int[]> freeIntBlocks = new ArrayList<int[]>();
> +
> + /* Allocate another int[] from the shared pool */
> + int[] getIntBlock() {
> + final int size = freeIntBlocks.size();
> + final int[] b;
> + if (0 == size) {
> + b = new int[INT_BLOCK_SIZE];
> + // Always record a block allocated, even if
> + // trackAllocations is false. This is necessary
> + // because this block will be shared between
> + // things that don't track allocations (term
> + // vectors) and things that do (freq/prox
> + // postings).
> + numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;
> + } else
> + b = freeIntBlocks.remove(size-1);
> + return b;
> + }
> +
> + void bytesUsed(long numBytes) {
> + numBytesUsed += numBytes;
> + }
> +
> + /* Return int[]s to the pool */
> + void recycleIntBlocks(int[][] blocks, int start, int end) {
> + for(int i=start;i<end;i++)
> + freeIntBlocks.add(blocks[i]);
> + }
> +
> + long getRAMUsed() {
> + return numBytesUsed;
> + }
> +
> + long numBytesUsed;
> +
> + NumberFormat nf = NumberFormat.getInstance();
> +
> + final static int PER_DOC_BLOCK_SIZE = 1024;
> +
> + // Coarse estimates used to measure RAM usage of buffered deletes
> + final static int OBJECT_HEADER_BYTES = 8;
> + final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4;
> + final static int INT_NUM_BYTE = 4;
> + final static int CHAR_NUM_BYTE = 2;
> +
> + /* Rough logic: HashMap has an array[Entry] w/ varying
> + load factor (say 2 * POINTER). Entry is object w/ Term
> + key, BufferedDeletes.Num val, int hash, Entry next
> + (OBJ_HEADER + 3*POINTER + INT). Term is object w/
> + String field and String text (OBJ_HEADER + 2*POINTER).
> + We don't count Term's field since it's interned.
> + Term's text is String (OBJ_HEADER + 4*INT + POINTER +
> + OBJ_HEADER + string.length*CHAR). BufferedDeletes.num is
> + OBJ_HEADER + INT. */
> +
> + final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE;
> +
> + /* Rough logic: del docIDs are List<Integer>. Say list
> + allocates ~2X size (2*POINTER). Integer is OBJ_HEADER
> + + int */
> + final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE;
> +
> + /* Rough logic: HashMap has an array[Entry] w/ varying
> + load factor (say 2 * POINTER). Entry is object w/
> + Query key, Integer val, int hash, Entry next
> + (OBJ_HEADER + 3*POINTER + INT). Query we often
> + undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */
> + final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24;
> +
> + /* Initial chunks size of the shared byte[] blocks used to
> + store postings data */
> + final static int BYTE_BLOCK_SHIFT = 15;
> + final static int BYTE_BLOCK_SIZE = 1<< BYTE_BLOCK_SHIFT;
> + final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
> + final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
> +
> + final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
> +
> + /* Initial chunks size of the shared int[] blocks used to
> + store postings data */
> + final static int INT_BLOCK_SHIFT = 13;
> + final static int INT_BLOCK_SIZE = 1<< INT_BLOCK_SHIFT;
> + final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
> +
> + String toMB(long v) {
> + return nf.format(v/1024./1024.);
> + }
> +
> +}
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,255 @@
> +package org.apache.lucene.index;
> +
> +import java.io.IOException;
> +import java.util.Iterator;
> +import java.util.concurrent.locks.Condition;
> +import java.util.concurrent.locks.Lock;
> +import java.util.concurrent.locks.ReentrantLock;
> +
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.util.ThreadInterruptedException;
> +
> +abstract class DocumentsWriterThreadPool {
> + public static abstract class Task<T> {
> + private boolean clearThreadBindings = false;
> +
> + protected void clearThreadBindings() {
> + this.clearThreadBindings = true;
> + }
> +
> + boolean doClearThreadBindings() {
> + return clearThreadBindings;
> + }
> + }
> +
> + public static abstract class PerThreadTask<T> extends Task<T> {
> + abstract T process(final DocumentsWriterPerThread perThread) throws IOException;
> + }
> +
> + public static abstract class AllThreadsTask<T> extends Task<T> {
> + abstract T process(final Iterator<DocumentsWriterPerThread> threadsIterator) throws IOException;
> + }
> +
> + protected abstract static class ThreadState {
> + private DocumentsWriterPerThread perThread;
> + private boolean isIdle = true;
> +
> + void start() {/* extension hook */}
> + void finish() {/* extension hook */}
> + }
> +
> + private int pauseThreads = 0;
> +
> + protected final int maxNumThreadStates;
> + protected ThreadState[] allThreadStates = new ThreadState[0];
> +
> + private final Lock lock = new ReentrantLock();
> + private final Condition threadStateAvailable = lock.newCondition();
> + private boolean globalLock;
> + private boolean aborting;
> +
> + DocumentsWriterThreadPool(int maxNumThreadStates) {
> + this.maxNumThreadStates = (maxNumThreadStates< 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumThreadStates;
> + }
> +
> + public final int getMaxThreadStates() {
> + return this.maxNumThreadStates;
> + }
> +
> + void pauseAllThreads() {
> + lock.lock();
> + try {
> + pauseThreads++;
> + while(!allThreadsIdle()) {
> + try {
> + threadStateAvailable.await();
> + } catch (InterruptedException ie) {
> + throw new ThreadInterruptedException(ie);
> + }
> + }
> + } finally {
> + lock.unlock();
> + }
> + }
> +
> + void resumeAllThreads() {
> + lock.lock();
> + try {
> + pauseThreads--;
> + assert pauseThreads>= 0;
> + if (0 == pauseThreads) {
> + threadStateAvailable.signalAll();
> + }
> + } finally {
> + lock.unlock();
> + }
> + }
> +
> + private boolean allThreadsIdle() {
> + for (ThreadState state : allThreadStates) {
> + if (!state.isIdle) {
> + return false;
> + }
> + }
> +
> + return true;
> + }
> +
> + void abort() throws IOException {
> + pauseAllThreads();
> + aborting = true;
> + for (ThreadState state : allThreadStates) {
> + state.perThread.abort();
> + }
> + }
> +
> + void finishAbort() {
> + aborting = false;
> + resumeAllThreads();
> + }
> +
> + public<T> T executeAllThreads(AllThreadsTask<T> task) throws IOException {
> + T result = null;
> +
> + lock.lock();
> + try {
> + try {
> + while (globalLock) {
> + threadStateAvailable.await();
> + }
> + } catch (InterruptedException ie) {
> + throw new ThreadInterruptedException(ie);
> + }
> +
> + globalLock = true;
> + pauseAllThreads();
> + } finally {
> + lock.unlock();
> + }
> +
> +
> + // all threads are idle now
> +
> + try {
> + final ThreadState[] localAllThreads = allThreadStates;
> +
> + result = task.process(new Iterator<DocumentsWriterPerThread>() {
> + int i = 0;
> +
> + @Override
> + public boolean hasNext() {
> + return i< localAllThreads.length;
> + }
> +
> + @Override
> + public DocumentsWriterPerThread next() {
> + return localAllThreads[i++].perThread;
> + }
> +
> + @Override
> + public void remove() {
> + throw new UnsupportedOperationException("remove() not supported.");
> + }
> + });
> + return result;
> + } finally {
> + lock.lock();
> + try {
> + try {
> + if (task.doClearThreadBindings()) {
> + clearAllThreadBindings();
> + }
> + } finally {
> + globalLock = false;
> + resumeAllThreads();
> + threadStateAvailable.signalAll();
> + }
> + } finally {
> + lock.unlock();
> + }
> +
> + }
> + }
> +
> +
> + public final<T> T executePerThread(DocumentsWriter documentsWriter, Document doc, PerThreadTask<T> task) throws IOException {
> + ThreadState state = acquireThreadState(documentsWriter, doc);
> + boolean success = false;
> + try {
> + T result = task.process(state.perThread);
> + success = true;
> + return result;
> + } finally {
> + boolean abort = false;
> + if (!success&& state.perThread.aborting) {
> + state.perThread.aborting = false;
> + abort = true;
> + }
> +
> + returnDocumentsWriterPerThread(state, task.doClearThreadBindings());
> +
> + if (abort) {
> + documentsWriter.abort();
> + }
> + }
> + }
> +
> + protected final<T extends ThreadState> T addNewThreadState(DocumentsWriter documentsWriter, T threadState) {
> + // Just create a new "private" thread state
> + ThreadState[] newArray = new ThreadState[1+allThreadStates.length];
> + if (allThreadStates.length> 0)
> + System.arraycopy(allThreadStates, 0, newArray, 0, allThreadStates.length);
> + threadState.perThread = documentsWriter.newDocumentsWriterPerThread();
> + newArray[allThreadStates.length] = threadState;
> +
> + allThreadStates = newArray;
> + return threadState;
> + }
> +
> + protected abstract ThreadState selectThreadState(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
> + protected void clearThreadBindings(ThreadState flushedThread) {
> + // subclasses can optionally override this to cleanup after a thread flushed
> + }
> +
> + protected void clearAllThreadBindings() {
> + // subclasses can optionally override this to cleanup after a thread flushed
> + }
> +
> +
> + private final ThreadState acquireThreadState(DocumentsWriter documentsWriter, Document doc) {
> + lock.lock();
> + try {
> + ThreadState threadState = selectThreadState(Thread.currentThread(), documentsWriter, doc);
> +
> + try {
> + while (!threadState.isIdle || globalLock || aborting) {
> + threadStateAvailable.await();
> + }
> + } catch (InterruptedException ie) {
> + throw new ThreadInterruptedException(ie);
> + }
> +
> + threadState.isIdle = false;
> + threadState.start();
> +
> + return threadState;
> +
> + } finally {
> + lock.unlock();
> + }
> + }
> +
> + private final void returnDocumentsWriterPerThread(ThreadState state, boolean clearThreadBindings) {
> + lock.lock();
> + try {
> + state.finish();
> + if (clearThreadBindings) {
> + clearThreadBindings(state);
> + }
> + state.isIdle = true;
> + threadStateAvailable.signalAll();
> + } finally {
> + lock.unlock();
> + }
> + }
> +}
>
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org