You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2011/05/09 17:24:23 UTC
svn commit: r1101062 [7/21] - in /lucene/dev/branches/bulkpostings: ./
dev-tools/ dev-tools/eclipse/ dev-tools/idea/.idea/
dev-tools/idea/lucene/contrib/ant/ dev-tools/idea/lucene/contrib/db/bdb-je/
dev-tools/idea/lucene/contrib/db/bdb/ dev-tools/idea/...
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java Mon May 9 15:24:04 2011
@@ -18,7 +18,7 @@ package org.apache.lucene.index;
*/
import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.DocumentsWriter.IndexingChain;
+import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.search.IndexSearcher;
@@ -41,7 +41,7 @@ import org.apache.lucene.util.Version;
* IndexWriterConfig conf = new IndexWriterConfig(analyzer);
* conf.setter1().setter2();
* </pre>
- *
+ *
* @since 3.1
*/
public final class IndexWriterConfig implements Cloneable {
@@ -56,7 +56,7 @@ public final class IndexWriterConfig imp
* </ul>
*/
public static enum OpenMode { CREATE, APPEND, CREATE_OR_APPEND }
-
+
/** Default value is 32. Change using {@link #setTermIndexInterval(int)}. */
public static final int DEFAULT_TERM_INDEX_INTERVAL = 32; // TODO: this should be private to the codec, not settable here
@@ -77,23 +77,19 @@ public final class IndexWriterConfig imp
/**
* Default value for the write lock timeout (1,000 ms).
- *
+ *
* @see #setDefaultWriteLockTimeout(long)
*/
public static long WRITE_LOCK_TIMEOUT = 1000;
- /** The maximum number of simultaneous threads that may be
- * indexing documents at once in IndexWriter; if more
- * than this many threads arrive they will wait for
- * others to finish. */
- public final static int DEFAULT_MAX_THREAD_STATES = 8;
-
/** Default setting for {@link #setReaderPooling}. */
public final static boolean DEFAULT_READER_POOLING = false;
/** Default value is 1. Change using {@link #setReaderTermsIndexDivisor(int)}. */
public static final int DEFAULT_READER_TERMS_INDEX_DIVISOR = IndexReader.DEFAULT_TERMS_INDEX_DIVISOR;
+ /** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
+ public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
/**
* Sets the default (for any instance) maximum time to wait for a write lock
* (in milliseconds).
@@ -105,7 +101,7 @@ public final class IndexWriterConfig imp
/**
* Returns the default write lock timeout for newly instantiated
* IndexWriterConfigs.
- *
+ *
* @see #setDefaultWriteLockTimeout(long)
*/
public static long getDefaultWriteLockTimeout() {
@@ -127,10 +123,12 @@ public final class IndexWriterConfig imp
private volatile IndexReaderWarmer mergedSegmentWarmer;
private volatile CodecProvider codecProvider;
private volatile MergePolicy mergePolicy;
- private volatile int maxThreadStates;
+ private volatile DocumentsWriterPerThreadPool indexerThreadPool;
private volatile boolean readerPooling;
private volatile int readerTermsIndexDivisor;
-
+ private volatile FlushPolicy flushPolicy;
+ private volatile int perThreadHardLimitMB;
+
private Version matchVersion;
/**
@@ -153,15 +151,16 @@ public final class IndexWriterConfig imp
maxBufferedDeleteTerms = DEFAULT_MAX_BUFFERED_DELETE_TERMS;
ramBufferSizeMB = DEFAULT_RAM_BUFFER_SIZE_MB;
maxBufferedDocs = DEFAULT_MAX_BUFFERED_DOCS;
- indexingChain = DocumentsWriter.defaultIndexingChain;
+ indexingChain = DocumentsWriterPerThread.defaultIndexingChain;
mergedSegmentWarmer = null;
codecProvider = CodecProvider.getDefault();
- mergePolicy = new LogByteSizeMergePolicy();
- maxThreadStates = DEFAULT_MAX_THREAD_STATES;
+ mergePolicy = new TieredMergePolicy();
readerPooling = DEFAULT_READER_POOLING;
+ indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool();
readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
+ perThreadHardLimitMB = DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
}
-
+
@Override
public Object clone() {
// Shallow clone is the only thing that's possible, since parameters like
@@ -186,7 +185,7 @@ public final class IndexWriterConfig imp
this.openMode = openMode;
return this;
}
-
+
/** Returns the {@link OpenMode} set by {@link #setOpenMode(OpenMode)}. */
public OpenMode getOpenMode() {
return openMode;
@@ -261,7 +260,7 @@ public final class IndexWriterConfig imp
public SimilarityProvider getSimilarityProvider() {
return similarityProvider;
}
-
+
/**
* Expert: set the interval between indexed terms. Large values cause less
* memory to be used by IndexReader, but slow random-access to terms. Small
@@ -281,7 +280,7 @@ public final class IndexWriterConfig imp
* In particular, <code>numUniqueTerms/interval</code> terms are read into
* memory by an IndexReader, and, on average, <code>interval/2</code> terms
* must be scanned for each random term access.
- *
+ *
* @see #DEFAULT_TERM_INDEX_INTERVAL
*
* <p>Takes effect immediately, but only applies to newly
@@ -293,7 +292,7 @@ public final class IndexWriterConfig imp
/**
* Returns the interval between indexed terms.
- *
+ *
* @see #setTermIndexInterval(int)
*/
public int getTermIndexInterval() { // TODO: this should be private to the codec, not settable here
@@ -331,10 +330,10 @@ public final class IndexWriterConfig imp
this.writeLockTimeout = writeLockTimeout;
return this;
}
-
+
/**
* Returns allowed timeout when acquiring the write lock.
- *
+ *
* @see #setWriteLockTimeout(long)
*/
public long getWriteLockTimeout() {
@@ -343,15 +342,16 @@ public final class IndexWriterConfig imp
/**
* Determines the minimal number of delete terms required before the buffered
- * in-memory delete terms are applied and flushed. If there are documents
- * buffered in memory at the time, they are merged and a new segment is
- * created.
-
- * <p>Disabled by default (writer flushes by RAM usage).
+ * in-memory delete terms and queries are applied and flushed.
+ * <p>Disabled by default (writer flushes by RAM usage).</p>
+ * <p>
+ * NOTE: This setting won't trigger a segment flush.
+ * </p>
*
* @throws IllegalArgumentException if maxBufferedDeleteTerms
* is enabled but smaller than 1
* @see #setRAMBufferSizeMB
+ * @see #setFlushPolicy(FlushPolicy)
*
* <p>Takes effect immediately, but only the next time a
* document is added, updated or deleted.
@@ -366,9 +366,9 @@ public final class IndexWriterConfig imp
}
/**
- * Returns the number of buffered deleted terms that will trigger a flush if
- * enabled.
- *
+ * Returns the number of buffered deleted terms that will trigger a flush of all
+ * buffered deletes if enabled.
+ *
* @see #setMaxBufferedDeleteTerms(int)
*/
public int getMaxBufferedDeleteTerms() {
@@ -380,45 +380,50 @@ public final class IndexWriterConfig imp
* and deletions before they are flushed to the Directory. Generally for
* faster indexing performance it's best to flush by RAM usage instead of
* document count and use as large a RAM buffer as you can.
- *
* <p>
* When this is set, the writer will flush whenever buffered documents and
* deletions use this much RAM. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent
* triggering a flush due to RAM usage. Note that if flushing by document
* count is also enabled, then the flush will be triggered by whichever comes
* first.
- *
+ * <p>
+ * The maximum RAM limit is inherently determined by the JVMs available memory.
+ * Yet, an {@link IndexWriter} session can consume a significantly larger amount
+ * of memory than the given RAM limit since this limit is just an indicator when
+ * to flush memory resident documents to the Directory. Flushes are likely happen
+ * concurrently while other threads adding documents to the writer. For application
+ * stability the available memory in the JVM should be significantly larger than
+ * the RAM buffer used for indexing.
* <p>
* <b>NOTE</b>: the account of RAM usage for pending deletions is only
* approximate. Specifically, if you delete by Query, Lucene currently has no
* way to measure the RAM usage of individual Queries so the accounting will
* under-estimate and you should compensate by either calling commit()
* periodically yourself, or by using {@link #setMaxBufferedDeleteTerms(int)}
- * to flush by count instead of RAM usage (each buffered delete Query counts
- * as one).
- *
+ * to flush and apply buffered deletes by count instead of RAM usage
+ * (for each buffered delete Query a constant number of bytes is used to estimate
+ * RAM usage). Note that enabling {@link #setMaxBufferedDeleteTerms(int)} will
+ * not trigger any segment flushes.
+ * <p>
+ * <b>NOTE</b>: It's not guaranteed that all memory resident documents are flushed
+ * once this limit is exceeded. Depending on the configured {@link FlushPolicy} only a
+ * subset of the buffered documents are flushed and therefore only parts of the RAM
+ * buffer is released.
* <p>
- * <b>NOTE</b>: because IndexWriter uses <code>int</code>s when managing its
- * internal storage, the absolute maximum value for this setting is somewhat
- * less than 2048 MB. The precise limit depends on various factors, such as
- * how large your documents are, how many fields have norms, etc., so it's
- * best to set this value comfortably under 2048.
*
- * <p>
* The default value is {@link #DEFAULT_RAM_BUFFER_SIZE_MB}.
- *
+ * @see #setFlushPolicy(FlushPolicy)
+ * @see #setRAMPerThreadHardLimitMB(int)
+ *
* <p>Takes effect immediately, but only the next time a
* document is added, updated or deleted.
*
* @throws IllegalArgumentException
* if ramBufferSize is enabled but non-positive, or it disables
* ramBufferSize when maxBufferedDocs is already disabled
+ *
*/
public IndexWriterConfig setRAMBufferSizeMB(double ramBufferSizeMB) {
- if (ramBufferSizeMB > 2048.0) {
- throw new IllegalArgumentException("ramBufferSize " + ramBufferSizeMB
- + " is too large; should be comfortably less than 2048");
- }
if (ramBufferSizeMB != DISABLE_AUTO_FLUSH && ramBufferSizeMB <= 0.0)
throw new IllegalArgumentException(
"ramBufferSize should be > 0.0 MB when enabled");
@@ -438,22 +443,22 @@ public final class IndexWriterConfig imp
* Determines the minimal number of documents required before the buffered
* in-memory documents are flushed as a new Segment. Large values generally
* give faster indexing.
- *
+ *
* <p>
* When this is set, the writer will flush every maxBufferedDocs added
* documents. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent triggering a
* flush due to number of buffered documents. Note that if flushing by RAM
* usage is also enabled, then the flush will be triggered by whichever comes
* first.
- *
+ *
* <p>
* Disabled by default (writer flushes by RAM usage).
- *
+ *
* <p>Takes effect immediately, but only the next time a
* document is added, updated or deleted.
*
* @see #setRAMBufferSizeMB(double)
- *
+ * @see #setFlushPolicy(FlushPolicy)
* @throws IllegalArgumentException
* if maxBufferedDocs is enabled but smaller than 2, or it disables
* maxBufferedDocs when ramBufferSize is already disabled
@@ -473,7 +478,7 @@ public final class IndexWriterConfig imp
/**
* Returns the number of buffered added documents that will trigger a flush if
* enabled.
- *
+ *
* @see #setMaxBufferedDocs(int)
*/
public int getMaxBufferedDocs() {
@@ -519,32 +524,43 @@ public final class IndexWriterConfig imp
return codecProvider;
}
-
+
/**
* Returns the current MergePolicy in use by this writer.
- *
+ *
* @see #setMergePolicy(MergePolicy)
*/
public MergePolicy getMergePolicy() {
return mergePolicy;
}
- /**
- * Sets the max number of simultaneous threads that may be indexing documents
- * at once in IndexWriter. Values < 1 are invalid and if passed
- * <code>maxThreadStates</code> will be set to
- * {@link #DEFAULT_MAX_THREAD_STATES}.
- *
- * <p>Only takes effect when IndexWriter is first created. */
- public IndexWriterConfig setMaxThreadStates(int maxThreadStates) {
- this.maxThreadStates = maxThreadStates < 1 ? DEFAULT_MAX_THREAD_STATES : maxThreadStates;
+ /** Expert: Sets the {@link DocumentsWriterPerThreadPool} instance used by the
+ * IndexWriter to assign thread-states to incoming indexing threads. If no
+ * {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use
+ * {@link ThreadAffinityDocumentsWriterThreadPool} with max number of
+ * thread-states set to {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES} (see
+ * {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES}).
+ * </p>
+ * <p>
+ * NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with
+ * other {@link IndexWriter} instances once it has been initialized / associated with an
+ * {@link IndexWriter}.
+ * </p>
+ * <p>
+ * NOTE: This only takes effect when IndexWriter is first created.</p>*/
+ public IndexWriterConfig setIndexerThreadPool(DocumentsWriterPerThreadPool threadPool) {
+ if(threadPool == null) {
+ throw new IllegalArgumentException("DocumentsWriterPerThreadPool must not be nul");
+ }
+ this.indexerThreadPool = threadPool;
return this;
}
- /** Returns the max number of simultaneous threads that
- * may be indexing documents at once in IndexWriter. */
- public int getMaxThreadStates() {
- return maxThreadStates;
+ /** Returns the configured {@link DocumentsWriterPerThreadPool} instance.
+ * @see #setIndexerThreadPool(DocumentsWriterPerThreadPool)
+ * @return the configured {@link DocumentsWriterPerThreadPool} instance.*/
+ public DocumentsWriterPerThreadPool getIndexerThreadPool() {
+ return this.indexerThreadPool;
}
/** By default, IndexWriter does not pool the
@@ -572,10 +588,10 @@ public final class IndexWriterConfig imp
*
* <p>Only takes effect when IndexWriter is first created. */
IndexWriterConfig setIndexingChain(IndexingChain indexingChain) {
- this.indexingChain = indexingChain == null ? DocumentsWriter.defaultIndexingChain : indexingChain;
+ this.indexingChain = indexingChain == null ? DocumentsWriterPerThread.defaultIndexingChain : indexingChain;
return this;
}
-
+
/** Returns the indexing chain set on {@link #setIndexingChain(IndexingChain)}. */
IndexingChain getIndexingChain() {
return indexingChain;
@@ -604,6 +620,53 @@ public final class IndexWriterConfig imp
return readerTermsIndexDivisor;
}
+ /**
+ * Expert: Controls when segments are flushed to disk during indexing.
+ * The {@link FlushPolicy} initialized during {@link IndexWriter} instantiation and once initialized
+ * the given instance is bound to this {@link IndexWriter} and should not be used with another writer.
+ * @see #setMaxBufferedDeleteTerms(int)
+ * @see #setMaxBufferedDocs(int)
+ * @see #setRAMBufferSizeMB(double)
+ */
+ public IndexWriterConfig setFlushPolicy(FlushPolicy flushPolicy) {
+ this.flushPolicy = flushPolicy;
+ return this;
+ }
+
+ /**
+ * Expert: Sets the maximum memory consumption per thread triggering a forced
+ * flush if exceeded. A {@link DocumentsWriterPerThread} is forcefully flushed
+ * once it exceeds this limit even if the {@link #getRAMBufferSizeMB()} has
+ * not been exceeded. This is a safety limit to prevent a
+ * {@link DocumentsWriterPerThread} from address space exhaustion due to its
+ * internal 32 bit signed integer based memory addressing.
+ * The given value must be less that 2GB (2048MB)
+ *
+ * @see #DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB
+ */
+ public IndexWriterConfig setRAMPerThreadHardLimitMB(int perThreadHardLimitMB) {
+ if (perThreadHardLimitMB <= 0 || perThreadHardLimitMB >= 2048) {
+ throw new IllegalArgumentException("PerThreadHardLimit must be greater than 0 and less than 2048MB");
+ }
+ this.perThreadHardLimitMB = perThreadHardLimitMB;
+ return this;
+ }
+
+ /**
+ * Returns the max amount of memory each {@link DocumentsWriterPerThread} can
+ * consume until forcefully flushed.
+ * @see #setRAMPerThreadHardLimitMB(int)
+ */
+ public int getRAMPerThreadHardLimitMB() {
+ return perThreadHardLimitMB;
+ }
+ /**
+ * @see #setFlushPolicy(FlushPolicy)
+ */
+ public FlushPolicy getFlushPolicy() {
+ return flushPolicy;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -623,9 +686,13 @@ public final class IndexWriterConfig imp
sb.append("mergedSegmentWarmer=").append(mergedSegmentWarmer).append("\n");
sb.append("codecProvider=").append(codecProvider).append("\n");
sb.append("mergePolicy=").append(mergePolicy).append("\n");
- sb.append("maxThreadStates=").append(maxThreadStates).append("\n");
+ sb.append("indexerThreadPool=").append(indexerThreadPool).append("\n");
sb.append("readerPooling=").append(readerPooling).append("\n");
sb.append("readerTermsIndexDivisor=").append(readerTermsIndexDivisor).append("\n");
+ sb.append("flushPolicy=").append(flushPolicy).append("\n");
+ sb.append("perThreadHardLimitMB=").append(perThreadHardLimitMB).append("\n");
+
return sb.toString();
}
+
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IntBlockPool.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IntBlockPool.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IntBlockPool.java Mon May 9 15:24:04 2011
@@ -1,5 +1,7 @@
package org.apache.lucene.index;
+import java.util.Arrays;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -22,24 +24,24 @@ final class IntBlockPool {
public int[][] buffers = new int[10][];
int bufferUpto = -1; // Which buffer we are upto
- public int intUpto = DocumentsWriter.INT_BLOCK_SIZE; // Where we are in head buffer
+ public int intUpto = DocumentsWriterPerThread.INT_BLOCK_SIZE; // Where we are in head buffer
public int[] buffer; // Current head buffer
- public int intOffset = -DocumentsWriter.INT_BLOCK_SIZE; // Current head offset
+ public int intOffset = -DocumentsWriterPerThread.INT_BLOCK_SIZE; // Current head offset
- final private DocumentsWriter docWriter;
+ final private DocumentsWriterPerThread docWriter;
- public IntBlockPool(DocumentsWriter docWriter) {
+ public IntBlockPool(DocumentsWriterPerThread docWriter) {
this.docWriter = docWriter;
}
public void reset() {
if (bufferUpto != -1) {
- if (bufferUpto > 0)
- // Recycle all but the first buffer
- docWriter.recycleIntBlocks(buffers, 1, 1+bufferUpto);
-
// Reuse first buffer
+ if (bufferUpto > 0) {
+ docWriter.recycleIntBlocks(buffers, 1, bufferUpto-1);
+ Arrays.fill(buffers, 1, bufferUpto, null);
+ }
bufferUpto = 0;
intUpto = 0;
intOffset = 0;
@@ -57,7 +59,7 @@ final class IntBlockPool {
bufferUpto++;
intUpto = 0;
- intOffset += DocumentsWriter.INT_BLOCK_SIZE;
+ intOffset += DocumentsWriterPerThread.INT_BLOCK_SIZE;
}
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java Mon May 9 15:24:04 2011
@@ -17,20 +17,22 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import java.util.Collection;
-import java.util.Map;
import java.io.IOException;
+import java.util.Map;
abstract class InvertedDocConsumer {
- /** Add a new thread */
- abstract InvertedDocConsumerPerThread addThread(DocInverterPerThread docInverterPerThread);
-
/** Abort (called after hitting AbortException) */
abstract void abort();
/** Flush a new segment */
- abstract void flush(Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+ abstract void flush(Map<FieldInfo, InvertedDocConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
+
+ abstract InvertedDocConsumerPerField addField(DocInverterPerField docInverterPerField, FieldInfo fieldInfo);
+
+ abstract void startDocument() throws IOException;
+
+ abstract void finishDocument() throws IOException;
/** Attempt to free RAM, returning true if any RAM was
* freed */
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java Mon May 9 15:24:04 2011
@@ -17,12 +17,13 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import java.util.Collection;
-import java.util.Map;
import java.io.IOException;
+import java.util.Map;
abstract class InvertedDocEndConsumer {
- abstract InvertedDocEndConsumerPerThread addThread(DocInverterPerThread docInverterPerThread);
- abstract void flush(Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+ abstract void flush(Map<FieldInfo, InvertedDocEndConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
abstract void abort();
+ abstract InvertedDocEndConsumerPerField addField(DocInverterPerField docInverterPerField, FieldInfo fieldInfo);
+ abstract void startDocument() throws IOException;
+ abstract void finishDocument() throws IOException;
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/LogMergePolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/LogMergePolicy.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/LogMergePolicy.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/LogMergePolicy.java Mon May 9 15:24:04 2011
@@ -20,7 +20,6 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
@@ -72,7 +71,6 @@ public abstract class LogMergePolicy ext
// out there wrote his own LMP ...
protected long maxMergeSizeForOptimize = Long.MAX_VALUE;
protected int maxMergeDocs = DEFAULT_MAX_MERGE_DOCS;
- protected boolean requireContiguousMerge = false;
protected double noCFSRatio = DEFAULT_NO_CFS_RATIO;
@@ -111,21 +109,6 @@ public abstract class LogMergePolicy ext
writer.get().message("LMP: " + message);
}
- /** If true, merges must be in-order slice of the
- * segments. If false, then the merge policy is free to
- * pick any segments. The default is false, which is
- * in general more efficient than true since it gives the
- * merge policy more freedom to pick closely sized
- * segments. */
- public void setRequireContiguousMerge(boolean v) {
- requireContiguousMerge = v;
- }
-
- /** See {@link #setRequireContiguousMerge}. */
- public boolean getRequireContiguousMerge() {
- return requireContiguousMerge;
- }
-
/** <p>Returns the number of segments that are merged at
* once and also controls the total number of segments
* allowed to accumulate in the index.</p> */
@@ -378,8 +361,6 @@ public abstract class LogMergePolicy ext
return null;
}
- // TODO: handle non-contiguous merge case differently?
-
// Find the newest (rightmost) segment that needs to
// be optimized (other segments may have been flushed
// since optimize started):
@@ -499,14 +480,6 @@ public abstract class LogMergePolicy ext
}
}
- private static class SortByIndex implements Comparator<SegmentInfoAndLevel> {
- public int compare(SegmentInfoAndLevel o1, SegmentInfoAndLevel o2) {
- return o1.index - o2.index;
- }
- }
-
- private static final SortByIndex sortByIndex = new SortByIndex();
-
/** Checks if any merges are now necessary and returns a
* {@link MergePolicy.MergeSpecification} if so. A merge
* is necessary when there are more than {@link
@@ -532,31 +505,24 @@ public abstract class LogMergePolicy ext
final SegmentInfo info = infos.info(i);
long size = size(info);
- // When we require contiguous merge, we still add the
- // segment to levels to avoid merging "across" a set
- // of segment being merged:
- if (!requireContiguousMerge && mergingSegments.contains(info)) {
- if (verbose()) {
- message("seg " + info.name + " already being merged; skip");
- }
- continue;
- }
-
// Floor tiny segments
if (size < 1) {
size = 1;
}
+
final SegmentInfoAndLevel infoLevel = new SegmentInfoAndLevel(info, (float) Math.log(size)/norm, i);
levels.add(infoLevel);
+
if (verbose()) {
- message("seg " + info.name + " level=" + infoLevel.level + " size=" + size);
+ final long segBytes = sizeBytes(info);
+ String extra = mergingSegments.contains(info) ? " [merging]" : "";
+ if (size >= maxMergeSize) {
+ extra += " [skip: too large]";
+ }
+ message("seg=" + writer.get().segString(info) + " level=" + infoLevel.level + " size=" + String.format("%.3f MB", segBytes/1024/1024.) + extra);
}
}
- if (!requireContiguousMerge) {
- Collections.sort(levels);
- }
-
final float levelFloor;
if (minMergeSize <= 0)
levelFloor = (float) 0.0;
@@ -614,23 +580,29 @@ public abstract class LogMergePolicy ext
int end = start + mergeFactor;
while(end <= 1+upto) {
boolean anyTooLarge = false;
+ boolean anyMerging = false;
for(int i=start;i<end;i++) {
final SegmentInfo info = levels.get(i).info;
anyTooLarge |= (size(info) >= maxMergeSize || sizeDocs(info) >= maxMergeDocs);
+ if (mergingSegments.contains(info)) {
+ anyMerging = true;
+ break;
+ }
}
- if (!anyTooLarge) {
+ if (anyMerging) {
+ // skip
+ } else if (!anyTooLarge) {
if (spec == null)
spec = new MergeSpecification();
- if (verbose()) {
- message(" " + start + " to " + end + ": add this merge");
- }
- Collections.sort(levels.subList(start, end), sortByIndex);
final SegmentInfos mergeInfos = new SegmentInfos();
for(int i=start;i<end;i++) {
mergeInfos.add(levels.get(i).info);
assert infos.contains(levels.get(i).info);
}
+ if (verbose()) {
+ message(" add merge=" + writer.get().segString(mergeInfos) + " start=" + start + " end=" + end);
+ }
spec.add(new OneMerge(mergeInfos));
} else if (verbose()) {
message(" " + start + " to " + end + ": contains segment over maxMergeSize or maxMergeDocs; skipping");
@@ -682,7 +654,7 @@ public abstract class LogMergePolicy ext
sb.append("calibrateSizeByDeletes=").append(calibrateSizeByDeletes).append(", ");
sb.append("maxMergeDocs=").append(maxMergeDocs).append(", ");
sb.append("useCompoundFile=").append(useCompoundFile).append(", ");
- sb.append("requireContiguousMerge=").append(requireContiguousMerge);
+ sb.append("noCFSRatio=").append(noCFSRatio);
sb.append("]");
return sb.toString();
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/MergePolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/MergePolicy.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/MergePolicy.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/MergePolicy.java Mon May 9 15:24:04 2011
@@ -72,6 +72,7 @@ public abstract class MergePolicy implem
long mergeGen; // used by IndexWriter
boolean isExternal; // used by IndexWriter
int maxNumSegmentsOptimize; // used by IndexWriter
+ long estimatedMergeBytes; // used by IndexWriter
List<SegmentReader> readers; // used by IndexWriter
List<SegmentReader> readerClones; // used by IndexWriter
public final SegmentInfos segments;
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/NormsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/NormsWriter.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/NormsWriter.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/NormsWriter.java Mon May 9 15:24:04 2011
@@ -19,11 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
-import java.util.Iterator;
-import java.util.HashMap;
import java.util.Map;
-import java.util.List;
-import java.util.ArrayList;
import org.apache.lucene.store.IndexOutput;
@@ -36,10 +32,6 @@ import org.apache.lucene.store.IndexOutp
final class NormsWriter extends InvertedDocEndConsumer {
- @Override
- public InvertedDocEndConsumerPerThread addThread(DocInverterPerThread docInverterPerThread) {
- return new NormsWriterPerThread(docInverterPerThread, this);
- }
@Override
public void abort() {}
@@ -50,40 +42,11 @@ final class NormsWriter extends Inverted
/** Produce _X.nrm if any document had a field with norms
* not disabled */
@Override
- public void flush(Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
- final Map<FieldInfo,List<NormsWriterPerField>> byField = new HashMap<FieldInfo,List<NormsWriterPerField>>();
-
+ public void flush(Map<FieldInfo,InvertedDocEndConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
if (!state.fieldInfos.hasNorms()) {
return;
}
- // Typically, each thread will have encountered the same
- // field. So first we collate by field, ie, all
- // per-thread field instances that correspond to the
- // same FieldInfo
- for (final Map.Entry<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> entry : threadsAndFields.entrySet()) {
- final Collection<InvertedDocEndConsumerPerField> fields = entry.getValue();
- final Iterator<InvertedDocEndConsumerPerField> fieldsIt = fields.iterator();
-
- while (fieldsIt.hasNext()) {
- final NormsWriterPerField perField = (NormsWriterPerField) fieldsIt.next();
-
- if (perField.upto > 0) {
- // It has some norms
- List<NormsWriterPerField> l = byField.get(perField.fieldInfo);
- if (l == null) {
- l = new ArrayList<NormsWriterPerField>();
- byField.put(perField.fieldInfo, l);
- }
- l.add(perField);
- } else
- // Remove this field since we haven't seen it
- // since the previous flush
- fieldsIt.remove();
- }
- }
-
final String normsFileName = IndexFileNames.segmentFileName(state.segmentName, "", IndexFileNames.NORMS_EXTENSION);
IndexOutput normsOut = state.directory.createOutput(normsFileName);
@@ -93,60 +56,25 @@ final class NormsWriter extends Inverted
int normCount = 0;
for (FieldInfo fi : state.fieldInfos) {
- final List<NormsWriterPerField> toMerge = byField.get(fi);
+ final NormsWriterPerField toWrite = (NormsWriterPerField) fieldsToFlush.get(fi);
int upto = 0;
- if (toMerge != null) {
-
- final int numFields = toMerge.size();
-
+ if (toWrite != null && toWrite.upto > 0) {
normCount++;
- final NormsWriterPerField[] fields = new NormsWriterPerField[numFields];
- int[] uptos = new int[numFields];
-
- for(int j=0;j<numFields;j++)
- fields[j] = toMerge.get(j);
-
- int numLeft = numFields;
-
- while(numLeft > 0) {
-
- assert uptos[0] < fields[0].docIDs.length : " uptos[0]=" + uptos[0] + " len=" + (fields[0].docIDs.length);
-
- int minLoc = 0;
- int minDocID = fields[0].docIDs[uptos[0]];
-
- for(int j=1;j<numLeft;j++) {
- final int docID = fields[j].docIDs[uptos[j]];
- if (docID < minDocID) {
- minDocID = docID;
- minLoc = j;
- }
- }
-
- assert minDocID < state.numDocs;
-
- // Fill hole
- for(;upto<minDocID;upto++)
+ int docID = 0;
+ for (; docID < state.numDocs; docID++) {
+ if (upto < toWrite.upto && toWrite.docIDs[upto] == docID) {
+ normsOut.writeByte(toWrite.norms[upto]);
+ upto++;
+ } else {
normsOut.writeByte((byte) 0);
-
- normsOut.writeByte(fields[minLoc].norms[uptos[minLoc]]);
- (uptos[minLoc])++;
- upto++;
-
- if (uptos[minLoc] == fields[minLoc].upto) {
- fields[minLoc].reset();
- if (minLoc != numLeft-1) {
- fields[minLoc] = fields[numLeft-1];
- uptos[minLoc] = uptos[numLeft-1];
- }
- numLeft--;
}
}
-
- // Fill final hole with defaultNorm
- for(;upto<state.numDocs;upto++)
- normsOut.writeByte((byte) 0);
+
+ // we should have consumed every norm
+ assert upto == toWrite.upto;
+
+ toWrite.reset();
} else if (fi.isIndexed && !fi.omitNorms) {
normCount++;
// Fill entire field with default norm:
@@ -161,4 +89,16 @@ final class NormsWriter extends Inverted
normsOut.close();
}
}
+
+ @Override
+ void finishDocument() throws IOException {}
+
+ @Override
+ void startDocument() throws IOException {}
+
+ @Override
+ InvertedDocEndConsumerPerField addField(DocInverterPerField docInverterPerField,
+ FieldInfo fieldInfo) {
+ return new NormsWriterPerField(docInverterPerField, fieldInfo);
+ }
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java Mon May 9 15:24:04 2011
@@ -27,9 +27,8 @@ import org.apache.lucene.util.ArrayUtil;
final class NormsWriterPerField extends InvertedDocEndConsumerPerField implements Comparable<NormsWriterPerField> {
- final NormsWriterPerThread perThread;
final FieldInfo fieldInfo;
- final DocumentsWriter.DocState docState;
+ final DocumentsWriterPerThread.DocState docState;
final Similarity similarity;
// Holds all docID/norm pairs we've seen
@@ -46,10 +45,9 @@ final class NormsWriterPerField extends
upto = 0;
}
- public NormsWriterPerField(final DocInverterPerField docInverterPerField, final NormsWriterPerThread perThread, final FieldInfo fieldInfo) {
- this.perThread = perThread;
+ public NormsWriterPerField(final DocInverterPerField docInverterPerField, final FieldInfo fieldInfo) {
this.fieldInfo = fieldInfo;
- docState = perThread.docState;
+ docState = docInverterPerField.docState;
fieldState = docInverterPerField.fieldState;
similarity = docState.similarityProvider.get(fieldInfo.name);
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentInfo.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentInfo.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentInfo.java Mon May 9 15:24:04 2011
@@ -37,14 +37,14 @@ import org.apache.lucene.util.Constants;
/**
* Information about a segment such as it's name, directory, and files related
* to the segment.
- *
+ *
* @lucene.experimental
*/
public final class SegmentInfo {
static final int NO = -1; // e.g. no norms; no deletes;
static final int YES = 1; // e.g. have norms; have deletes;
- static final int WITHOUT_GEN = 0; // a file name that has no GEN in it.
+ static final int WITHOUT_GEN = 0; // a file name that has no GEN in it.
public String name; // unique name in dir
public int docCount; // number of docs in seg
@@ -56,7 +56,7 @@ public final class SegmentInfo {
* - YES or higher if there are deletes at generation N
*/
private long delGen;
-
+
/*
* Current generation of each field's norm file. If this array is null,
* means no separate norms. If this array is not null, its values mean:
@@ -65,7 +65,7 @@ public final class SegmentInfo {
*/
private Map<Integer,Long> normGen;
- private boolean isCompoundFile;
+ private boolean isCompoundFile;
private volatile List<String> files; // cached list of files that this segment uses
// in the Directory
@@ -73,10 +73,13 @@ public final class SegmentInfo {
private volatile long sizeInBytesNoStore = -1; // total byte size of all but the store files (computed on demand)
private volatile long sizeInBytesWithStore = -1; // total byte size of all of our files (computed on demand)
+ //TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
private int docStoreOffset; // if this segment shares stored fields & vectors, this
// offset is where in that file this segment's docs begin
+ //TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
private String docStoreSegment; // name used to derive fields/vectors file we share with
// other segments
+ //TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
private boolean docStoreIsCompoundFile; // whether doc store files are stored in compound file (*.cfx)
private int delCount; // How many deleted docs in this segment
@@ -91,9 +94,9 @@ public final class SegmentInfo {
private Map<String,String> diagnostics;
- // Tracks the Lucene version this segment was created with, since 3.1. Null
+ // Tracks the Lucene version this segment was created with, since 3.1. Null
// indicates an older than 3.0 index, and it's used to detect a too old index.
- // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
+ // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
// specific versions afterwards ("3.0", "3.1" etc.).
// see Constants.LUCENE_MAIN_VERSION.
private String version;
@@ -101,7 +104,7 @@ public final class SegmentInfo {
// NOTE: only used in-RAM by IW to track buffered deletes;
// this is never written to/read from the Directory
private long bufferedDeletesGen;
-
+
public SegmentInfo(String name, int docCount, Directory dir, boolean isCompoundFile,
boolean hasProx, SegmentCodecs segmentCodecs, boolean hasVectors, FieldInfos fieldInfos) {
this.name = name;
@@ -182,11 +185,13 @@ public final class SegmentInfo {
docStoreSegment = name;
docStoreIsCompoundFile = false;
}
+
if (format > DefaultSegmentInfosWriter.FORMAT_4_0) {
// pre-4.0 indexes write a byte if there is a single norms file
byte b = input.readByte();
assert 1 == b;
}
+
int numNormGen = input.readInt();
if (numNormGen == NO) {
normGen = null;
@@ -207,7 +212,7 @@ public final class SegmentInfo {
assert delCount <= docCount;
hasProx = input.readByte() == YES;
-
+
// System.out.println(Thread.currentThread().getName() + ": si.read hasProx=" + hasProx + " seg=" + name);
if (format <= DefaultSegmentInfosWriter.FORMAT_4_0) {
segmentCodecs = new SegmentCodecs(codecs, input);
@@ -217,7 +222,7 @@ public final class SegmentInfo {
segmentCodecs = new SegmentCodecs(codecs, new Codec[] { codecs.lookup("PreFlex")});
}
diagnostics = input.readStringStringMap();
-
+
if (format <= DefaultSegmentInfosWriter.FORMAT_HAS_VECTORS) {
hasVectors = input.readByte() == 1;
} else {
@@ -366,7 +371,7 @@ public final class SegmentInfo {
// against this segment
return null;
} else {
- return IndexFileNames.fileNameFromGeneration(name, IndexFileNames.DELETES_EXTENSION, delGen);
+ return IndexFileNames.fileNameFromGeneration(name, IndexFileNames.DELETES_EXTENSION, delGen);
}
}
@@ -432,7 +437,7 @@ public final class SegmentInfo {
if (hasSeparateNorms(number)) {
return IndexFileNames.fileNameFromGeneration(name, "s" + number, normGen.get(number));
} else {
- // single file for all norms
+ // single file for all norms
return IndexFileNames.fileNameFromGeneration(name, IndexFileNames.NORMS_EXTENSION, WITHOUT_GEN);
}
}
@@ -465,39 +470,74 @@ public final class SegmentInfo {
assert delCount <= docCount;
}
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
public int getDocStoreOffset() {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
return docStoreOffset;
}
-
+
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
public boolean getDocStoreIsCompoundFile() {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
return docStoreIsCompoundFile;
}
-
- void setDocStoreIsCompoundFile(boolean v) {
- docStoreIsCompoundFile = v;
+
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
+ public void setDocStoreIsCompoundFile(boolean docStoreIsCompoundFile) {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
+ this.docStoreIsCompoundFile = docStoreIsCompoundFile;
clearFilesCache();
}
-
+
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
+ void setDocStore(int offset, String segment, boolean isCompoundFile) {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
+ docStoreOffset = offset;
+ docStoreSegment = segment;
+ docStoreIsCompoundFile = isCompoundFile;
+ clearFilesCache();
+ }
+
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
public String getDocStoreSegment() {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
return docStoreSegment;
}
-
- public void setDocStoreSegment(String segment) {
- docStoreSegment = segment;
- }
-
+
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
void setDocStoreOffset(int offset) {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
docStoreOffset = offset;
clearFilesCache();
}
- void setDocStore(int offset, String segment, boolean isCompoundFile) {
- docStoreOffset = offset;
- docStoreSegment = segment;
- docStoreIsCompoundFile = isCompoundFile;
- clearFilesCache();
+ /**
+ * @deprecated shared doc stores are not supported in 4.0
+ */
+ @Deprecated
+ public void setDocStoreSegment(String docStoreSegment) {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
+ this.docStoreSegment = docStoreSegment;
}
-
+
/** Save this segment's info. */
public void write(IndexOutput output)
throws IOException {
@@ -507,12 +547,14 @@ public final class SegmentInfo {
output.writeString(name);
output.writeInt(docCount);
output.writeLong(delGen);
+
output.writeInt(docStoreOffset);
if (docStoreOffset != -1) {
output.writeString(docStoreSegment);
output.writeByte((byte) (docStoreIsCompoundFile ? 1:0));
}
+
if (normGen == null) {
output.writeInt(NO);
} else {
@@ -522,7 +564,7 @@ public final class SegmentInfo {
output.writeLong(entry.getValue());
}
}
-
+
output.writeByte((byte) (isCompoundFile ? YES : NO));
output.writeInt(delCount);
output.writeByte((byte) (hasProx ? 1:0));
@@ -570,9 +612,9 @@ public final class SegmentInfo {
// Already cached:
return files;
}
-
+
Set<String> fileSet = new HashSet<String>();
-
+
boolean useCompoundFile = getUseCompoundFile();
if (useCompoundFile) {
@@ -606,7 +648,7 @@ public final class SegmentInfo {
fileSet.add(IndexFileNames.segmentFileName(name, "", IndexFileNames.VECTORS_INDEX_EXTENSION));
fileSet.add(IndexFileNames.segmentFileName(name, "", IndexFileNames.VECTORS_DOCUMENTS_EXTENSION));
fileSet.add(IndexFileNames.segmentFileName(name, "", IndexFileNames.VECTORS_FIELDS_EXTENSION));
- }
+ }
}
String delFileName = IndexFileNames.fileNameFromGeneration(name, IndexFileNames.DELETES_EXTENSION, delGen);
@@ -644,7 +686,7 @@ public final class SegmentInfo {
}
/** Used for debugging. Format may suddenly change.
- *
+ *
* <p>Current format looks like
* <code>_a(3.1):c45/4->_1</code>, which means the segment's
* name is <code>_a</code>; it was created with Lucene 3.1 (or
@@ -659,7 +701,6 @@ public final class SegmentInfo {
StringBuilder s = new StringBuilder();
s.append(name).append('(').append(version == null ? "?" : version).append(')').append(':');
-
char cfs = getUseCompoundFile() ? 'c' : 'C';
s.append(cfs);
@@ -675,7 +716,7 @@ public final class SegmentInfo {
if (delCount != 0) {
s.append('/').append(delCount);
}
-
+
if (docStoreOffset != -1) {
s.append("->").append(docStoreSegment);
if (docStoreIsCompoundFile) {
@@ -715,13 +756,13 @@ public final class SegmentInfo {
* <b>NOTE:</b> this method is used for internal purposes only - you should
* not modify the version of a SegmentInfo, or it may result in unexpected
* exceptions thrown when you attempt to open the index.
- *
+ *
* @lucene.internal
*/
public void setVersion(String version) {
this.version = version;
}
-
+
/** Returns the version of the code which wrote the segment. */
public String getVersion() {
return version;
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentMerger.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentMerger.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentMerger.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentMerger.java Mon May 9 15:24:04 2011
@@ -39,24 +39,24 @@ import org.apache.lucene.util.ReaderUtil
/**
* The SegmentMerger class combines two or more Segments, represented by an IndexReader ({@link #add},
- * into a single Segment. After adding the appropriate readers, call the merge method to combine the
+ * into a single Segment. After adding the appropriate readers, call the merge method to combine the
* segments.
- *
+ *
* @see #merge
* @see #add
*/
final class SegmentMerger {
-
+
/** norms header placeholder */
- static final byte[] NORMS_HEADER = new byte[]{'N','R','M',-1};
-
+ static final byte[] NORMS_HEADER = new byte[]{'N','R','M',-1};
+
private Directory directory;
private String segment;
private int termIndexInterval = IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL;
private List<IndexReader> readers = new ArrayList<IndexReader>();
private final FieldInfos fieldInfos;
-
+
private int mergedDocs;
private final MergeState.CheckAbort checkAbort;
@@ -64,13 +64,13 @@ final class SegmentMerger {
/** Maximum number of contiguous documents to bulk-copy
when merging stored fields */
private final static int MAX_RAW_MERGE_DOCS = 4192;
-
+
private final CodecProvider codecs;
private Codec codec;
private SegmentWriteState segmentWriteState;
private PayloadProcessorProvider payloadProcessorProvider;
-
+
SegmentMerger(Directory dir, int termIndexInterval, String name, MergePolicy.OneMerge merge, CodecProvider codecs, PayloadProcessorProvider payloadProcessorProvider, FieldInfos fieldInfos) {
this.payloadProcessorProvider = payloadProcessorProvider;
directory = dir;
@@ -135,10 +135,10 @@ final class SegmentMerger {
for (String file : files) {
cfsWriter.addFile(file);
}
-
+
// Perform the merge
cfsWriter.close();
-
+
return files;
}
@@ -196,13 +196,12 @@ final class SegmentMerger {
}
/**
- *
+ *
* @return The number of documents in all of the readers
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
private int mergeFields() throws CorruptIndexException, IOException {
-
for (IndexReader reader : readers) {
if (reader instanceof SegmentReader) {
SegmentReader segmentReader = (SegmentReader) reader;
@@ -265,7 +264,7 @@ final class SegmentMerger {
throw new RuntimeException("mergeFields produced an invalid result: docCount is " + docCount + " but fdx file size is " + fdxFileLength + " file=" + fileName + " file exists?=" + directory.fileExists(fileName) + "; now aborting this merge to prevent index corruption");
segmentWriteState = new SegmentWriteState(null, directory, segment, fieldInfos, docCount, termIndexInterval, codecInfo, null);
-
+
return docCount;
}
@@ -283,7 +282,7 @@ final class SegmentMerger {
++j;
continue;
}
- // We can optimize this case (doing a bulk byte copy) since the field
+ // We can optimize this case (doing a bulk byte copy) since the field
// numbers are identical
int start = j, numDocs = 0;
do {
@@ -295,7 +294,7 @@ final class SegmentMerger {
break;
}
} while(numDocs < MAX_RAW_MERGE_DOCS);
-
+
IndexInput stream = matchingFieldsReader.rawDocs(rawDocLengths, start, numDocs);
fieldsWriter.addRawDocuments(stream, rawDocLengths, numDocs);
docCount += numDocs;
@@ -349,7 +348,7 @@ final class SegmentMerger {
* @throws IOException
*/
private final void mergeVectors() throws IOException {
- TermVectorsWriter termVectorsWriter =
+ TermVectorsWriter termVectorsWriter =
new TermVectorsWriter(directory, segment, fieldInfos);
try {
@@ -369,7 +368,7 @@ final class SegmentMerger {
copyVectorsWithDeletions(termVectorsWriter, matchingVectorsReader, reader);
} else {
copyVectorsNoDeletions(termVectorsWriter, matchingVectorsReader, reader);
-
+
}
}
} finally {
@@ -402,7 +401,7 @@ final class SegmentMerger {
++docNum;
continue;
}
- // We can optimize this case (doing a bulk byte copy) since the field
+ // We can optimize this case (doing a bulk byte copy) since the field
// numbers are identical
int start = docNum, numDocs = 0;
do {
@@ -414,7 +413,7 @@ final class SegmentMerger {
break;
}
} while(numDocs < MAX_RAW_MERGE_DOCS);
-
+
matchingVectorsReader.rawDocs(rawDocLengths, rawDocLengths2, start, numDocs);
termVectorsWriter.addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, numDocs);
checkAbort.work(300 * numDocs);
@@ -425,7 +424,7 @@ final class SegmentMerger {
// skip deleted docs
continue;
}
-
+
// NOTE: it's very important to first assign to vectors then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
TermFreqVector[] vectors = reader.getTermFreqVectors(docNum);
@@ -434,7 +433,7 @@ final class SegmentMerger {
}
}
}
-
+
private void copyVectorsNoDeletions(final TermVectorsWriter termVectorsWriter,
final TermVectorsReader matchingVectorsReader,
final IndexReader reader)
@@ -470,7 +469,7 @@ final class SegmentMerger {
// Let CodecProvider decide which codec will be used to write
// the new segment:
-
+
int docBase = 0;
final List<Fields> fields = new ArrayList<Fields>();
@@ -498,7 +497,7 @@ final class SegmentMerger {
mergeState.readerCount = readers.size();
mergeState.fieldInfos = fieldInfos;
mergeState.mergedDocCount = mergedDocs;
-
+
// Remap docIDs
mergeState.delCounts = new int[mergeState.readerCount];
mergeState.docMaps = new int[mergeState.readerCount][];
@@ -536,7 +535,7 @@ final class SegmentMerger {
}
assert delCount == mergeState.delCounts[i]: "reader delCount=" + mergeState.delCounts[i] + " vs recomputed delCount=" + delCount;
}
-
+
if (payloadProcessorProvider != null) {
mergeState.dirPayloadProcessor[i] = payloadProcessorProvider.getDirProcessor(reader.directory());
}
@@ -549,7 +548,7 @@ final class SegmentMerger {
// apart when we step through the docs enums in
// MultiDocsEnum.
mergeState.multiDeletedDocs = new MultiBits(bits, bitsStarts);
-
+
try {
consumer.merge(mergeState,
new MultiFields(fields.toArray(Fields.EMPTY_ARRAY),
@@ -568,7 +567,7 @@ final class SegmentMerger {
int[] getDelCounts() {
return mergeState.delCounts;
}
-
+
public boolean getAnyNonBulkMerges() {
assert matchedCount <= readers.size();
return matchedCount != readers.size();
@@ -579,7 +578,7 @@ final class SegmentMerger {
try {
for (FieldInfo fi : fieldInfos) {
if (fi.isIndexed && !fi.omitNorms) {
- if (output == null) {
+ if (output == null) {
output = directory.createOutput(IndexFileNames.segmentFileName(segment, "", IndexFileNames.NORMS_EXTENSION));
output.writeBytes(NORMS_HEADER,NORMS_HEADER.length);
}
@@ -610,7 +609,7 @@ final class SegmentMerger {
}
}
} finally {
- if (output != null) {
+ if (output != null) {
output.close();
}
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentReader.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentReader.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/SegmentReader.java Mon May 9 15:24:04 2011
@@ -55,6 +55,9 @@ public class SegmentReader extends Index
AtomicInteger deletedDocsRef = null;
private boolean deletedDocsDirty = false;
private boolean normsDirty = false;
+
+ // TODO: we should move this tracking into SegmentInfo;
+ // this way SegmentInfo.toString shows pending deletes
private int pendingDeleteCount;
private boolean rollbackHasChanges = false;
@@ -803,8 +806,9 @@ public class SegmentReader extends Index
oldRef.decrementAndGet();
}
deletedDocsDirty = true;
- if (!deletedDocs.getAndSet(docNum))
+ if (!deletedDocs.getAndSet(docNum)) {
pendingDeleteCount++;
+ }
}
@Override
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java Mon May 9 15:24:04 2011
@@ -18,7 +18,8 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
-import org.apache.lucene.store.RAMOutputStream;
+
+import org.apache.lucene.document.Fieldable;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
@@ -26,22 +27,38 @@ import org.apache.lucene.util.RamUsageEs
final class StoredFieldsWriter {
FieldsWriter fieldsWriter;
- final DocumentsWriter docWriter;
+ final DocumentsWriterPerThread docWriter;
int lastDocID;
- PerDoc[] docFreeList = new PerDoc[1];
int freeCount;
- public StoredFieldsWriter(DocumentsWriter docWriter) {
+ final DocumentsWriterPerThread.DocState docState;
+
+ public StoredFieldsWriter(DocumentsWriterPerThread docWriter) {
this.docWriter = docWriter;
+ this.docState = docWriter.docState;
+ }
+
+ private int numStoredFields;
+ private Fieldable[] storedFields;
+ private int[] fieldNumbers;
+
+ public void reset() {
+ numStoredFields = 0;
+ storedFields = new Fieldable[1];
+ fieldNumbers = new int[1];
}
- public StoredFieldsWriterPerThread addThread(DocumentsWriter.DocState docState) throws IOException {
- return new StoredFieldsWriterPerThread(docState, this);
+ public void startDocument() {
+ reset();
}
- synchronized public void flush(SegmentWriteState state) throws IOException {
- if (state.numDocs > lastDocID) {
+ public void flush(SegmentWriteState state) throws IOException {
+
+ if (state.numDocs > 0) {
+ // It's possible that all documents seen in this segment
+ // hit non-aborting exceptions, in which case we will
+ // not have yet init'd the FieldsWriter:
initFieldsWriter();
fill(state.numDocs);
}
@@ -67,23 +84,9 @@ final class StoredFieldsWriter {
int allocCount;
- synchronized PerDoc getPerDoc() {
- if (freeCount == 0) {
- allocCount++;
- if (allocCount > docFreeList.length) {
- // Grow our free list up front to make sure we have
- // enough space to recycle all outstanding PerDoc
- // instances
- assert allocCount == 1+docFreeList.length;
- docFreeList = new PerDoc[ArrayUtil.oversize(allocCount, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
- }
- return new PerDoc();
- } else {
- return docFreeList[--freeCount];
- }
- }
+ void abort() {
+ reset();
- synchronized void abort() {
if (fieldsWriter != null) {
fieldsWriter.abort();
fieldsWriter = null;
@@ -101,53 +104,40 @@ final class StoredFieldsWriter {
}
}
- synchronized void finishDocument(PerDoc perDoc) throws IOException {
+ void finishDocument() throws IOException {
assert docWriter.writer.testPoint("StoredFieldsWriter.finishDocument start");
+
initFieldsWriter();
+ fill(docState.docID);
- fill(perDoc.docID);
+ if (fieldsWriter != null && numStoredFields > 0) {
+ fieldsWriter.startDocument(numStoredFields);
+ for (int i = 0; i < numStoredFields; i++) {
+ fieldsWriter.writeField(fieldNumbers[i], storedFields[i]);
+ }
+ lastDocID++;
+ }
- // Append stored fields to the real FieldsWriter:
- fieldsWriter.flushDocument(perDoc.numStoredFields, perDoc.fdt);
- lastDocID++;
- perDoc.reset();
- free(perDoc);
+ reset();
assert docWriter.writer.testPoint("StoredFieldsWriter.finishDocument end");
}
- synchronized void free(PerDoc perDoc) {
- assert freeCount < docFreeList.length;
- assert 0 == perDoc.numStoredFields;
- assert 0 == perDoc.fdt.length();
- assert 0 == perDoc.fdt.getFilePointer();
- docFreeList[freeCount++] = perDoc;
- }
-
- class PerDoc extends DocumentsWriter.DocWriter {
- final DocumentsWriter.PerDocBuffer buffer = docWriter.newPerDocBuffer();
- RAMOutputStream fdt = new RAMOutputStream(buffer);
- int numStoredFields;
-
- void reset() {
- fdt.reset();
- buffer.recycle();
- numStoredFields = 0;
+ public void addField(Fieldable field, FieldInfo fieldInfo) throws IOException {
+ if (numStoredFields == storedFields.length) {
+ int newSize = ArrayUtil.oversize(numStoredFields + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
+ Fieldable[] newArray = new Fieldable[newSize];
+ System.arraycopy(storedFields, 0, newArray, 0, numStoredFields);
+ storedFields = newArray;
}
- @Override
- void abort() {
- reset();
- free(this);
+ if (numStoredFields == fieldNumbers.length) {
+ fieldNumbers = ArrayUtil.grow(fieldNumbers);
}
- @Override
- public long sizeInBytes() {
- return buffer.getSizeInBytes();
- }
+ storedFields[numStoredFields] = field;
+ fieldNumbers[numStoredFields] = fieldInfo.number;
+ numStoredFields++;
- @Override
- public void finish() throws IOException {
- finishDocument(this);
- }
+ assert docState.testPoint("StoredFieldsWriterPerThread.processFields.writeField");
}
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java Mon May 9 15:24:04 2011
@@ -17,49 +17,48 @@ package org.apache.lucene.index;
* limitations under the License.
*/
+import java.io.IOException;
+import java.util.Map;
+
import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
-import java.io.IOException;
-import java.util.Collection;
-
-import java.util.Map;
-
final class TermVectorsTermsWriter extends TermsHashConsumer {
- final DocumentsWriter docWriter;
- PerDoc[] docFreeList = new PerDoc[1];
+ final DocumentsWriterPerThread docWriter;
int freeCount;
IndexOutput tvx;
IndexOutput tvd;
IndexOutput tvf;
int lastDocID;
+
+ final DocumentsWriterPerThread.DocState docState;
+ final BytesRef flushTerm = new BytesRef();
+
+ // Used by perField when serializing the term vectors
+ final ByteSliceReader vectorSliceReader = new ByteSliceReader();
boolean hasVectors;
- public TermVectorsTermsWriter(DocumentsWriter docWriter) {
+ public TermVectorsTermsWriter(DocumentsWriterPerThread docWriter) {
this.docWriter = docWriter;
+ docState = docWriter.docState;
}
@Override
- public TermsHashConsumerPerThread addThread(TermsHashPerThread termsHashPerThread) {
- return new TermVectorsTermsWriterPerThread(termsHashPerThread, this);
- }
-
- @Override
- synchronized void flush(Map<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> threadsAndFields, final SegmentWriteState state) throws IOException {
+ void flush(Map<FieldInfo, TermsHashConsumerPerField> fieldsToFlush, final SegmentWriteState state) throws IOException {
if (tvx != null) {
// At least one doc in this run had term vectors enabled
fill(state.numDocs);
+ assert state.segmentName != null;
+ String idxName = IndexFileNames.segmentFileName(state.segmentName, "", IndexFileNames.VECTORS_INDEX_EXTENSION);
tvx.close();
tvf.close();
tvd.close();
tvx = tvd = tvf = null;
- assert state.segmentName != null;
- String idxName = IndexFileNames.segmentFileName(state.segmentName, "", IndexFileNames.VECTORS_INDEX_EXTENSION);
- if (4 + ((long) state.numDocs) * 16 != state.directory.fileLength(idxName)) {
+ if (4+((long) state.numDocs)*16 != state.directory.fileLength(idxName)) {
throw new RuntimeException("after flush: tvx size mismatch: " + state.numDocs + " docs vs " + state.directory.fileLength(idxName) + " length in bytes of " + idxName + " file exists?=" + state.directory.fileExists(idxName));
}
@@ -68,33 +67,10 @@ final class TermVectorsTermsWriter exten
hasVectors = false;
}
- for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
- for (final TermsHashConsumerPerField field : entry.getValue() ) {
- TermVectorsTermsWriterPerField perField = (TermVectorsTermsWriterPerField) field;
- perField.termsHashPerField.reset();
- perField.shrinkHash();
- }
-
- TermVectorsTermsWriterPerThread perThread = (TermVectorsTermsWriterPerThread) entry.getKey();
- perThread.termsHashPerThread.reset(true);
- }
- }
-
- int allocCount;
-
- synchronized PerDoc getPerDoc() {
- if (freeCount == 0) {
- allocCount++;
- if (allocCount > docFreeList.length) {
- // Grow our free list up front to make sure we have
- // enough space to recycle all outstanding PerDoc
- // instances
- assert allocCount == 1+docFreeList.length;
- docFreeList = new PerDoc[ArrayUtil.oversize(allocCount, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
- }
- return new PerDoc();
- } else {
- return docFreeList[--freeCount];
+ for (final TermsHashConsumerPerField field : fieldsToFlush.values() ) {
+ TermVectorsTermsWriterPerField perField = (TermVectorsTermsWriterPerField) field;
+ perField.termsHashPerField.reset();
+ perField.shrinkHash();
}
}
@@ -112,18 +88,17 @@ final class TermVectorsTermsWriter exten
}
}
- synchronized void initTermVectorsWriter() throws IOException {
+ private final void initTermVectorsWriter() throws IOException {
if (tvx == null) {
// If we hit an exception while init'ing the term
// vector output files, we must abort this segment
// because those files will be in an unknown
// state:
- hasVectors = true;
tvx = docWriter.directory.createOutput(IndexFileNames.segmentFileName(docWriter.getSegment(), "", IndexFileNames.VECTORS_INDEX_EXTENSION));
tvd = docWriter.directory.createOutput(IndexFileNames.segmentFileName(docWriter.getSegment(), "", IndexFileNames.VECTORS_DOCUMENTS_EXTENSION));
tvf = docWriter.directory.createOutput(IndexFileNames.segmentFileName(docWriter.getSegment(), "", IndexFileNames.VECTORS_FIELDS_EXTENSION));
-
+
tvx.writeInt(TermVectorsReader.FORMAT_CURRENT);
tvd.writeInt(TermVectorsReader.FORMAT_CURRENT);
tvf.writeInt(TermVectorsReader.FORMAT_CURRENT);
@@ -132,39 +107,44 @@ final class TermVectorsTermsWriter exten
}
}
- synchronized void finishDocument(PerDoc perDoc) throws IOException {
+ @Override
+ void finishDocument(TermsHash termsHash) throws IOException {
assert docWriter.writer.testPoint("TermVectorsTermsWriter.finishDocument start");
+ if (!hasVectors) {
+ return;
+ }
+
initTermVectorsWriter();
- fill(perDoc.docID);
+ fill(docState.docID);
// Append term vectors to the real outputs:
- tvx.writeLong(tvd.getFilePointer());
+ long pointer = tvd.getFilePointer();
+ tvx.writeLong(pointer);
tvx.writeLong(tvf.getFilePointer());
- tvd.writeVInt(perDoc.numVectorFields);
- if (perDoc.numVectorFields > 0) {
- for(int i=0;i<perDoc.numVectorFields;i++) {
- tvd.writeVInt(perDoc.fieldNumbers[i]);
- }
- assert 0 == perDoc.fieldPointers[0];
- long lastPos = perDoc.fieldPointers[0];
- for(int i=1;i<perDoc.numVectorFields;i++) {
- long pos = perDoc.fieldPointers[i];
+ tvd.writeVInt(numVectorFields);
+ if (numVectorFields > 0) {
+ for(int i=0;i<numVectorFields;i++) {
+ tvd.writeVInt(perFields[i].fieldInfo.number);
+ }
+ long lastPos = tvf.getFilePointer();
+ perFields[0].finishDocument();
+ for(int i=1;i<numVectorFields;i++) {
+ long pos = tvf.getFilePointer();
tvd.writeVLong(pos-lastPos);
lastPos = pos;
+ perFields[i].finishDocument();
}
- perDoc.perDocTvf.writeTo(tvf);
- perDoc.numVectorFields = 0;
}
- assert lastDocID == perDoc.docID;
+ assert lastDocID == docState.docID;
lastDocID++;
- perDoc.reset();
- free(perDoc);
+ termsHash.reset();
+ reset();
assert docWriter.writer.testPoint("TermVectorsTermsWriter.finishDocument end");
}
@@ -189,55 +169,58 @@ final class TermVectorsTermsWriter exten
}
tvx = tvd = tvf = null;
lastDocID = 0;
- }
- synchronized void free(PerDoc doc) {
- assert freeCount < docFreeList.length;
- docFreeList[freeCount++] = doc;
+ reset();
}
- class PerDoc extends DocumentsWriter.DocWriter {
+ int numVectorFields;
- final DocumentsWriter.PerDocBuffer buffer = docWriter.newPerDocBuffer();
- RAMOutputStream perDocTvf = new RAMOutputStream(buffer);
+ TermVectorsTermsWriterPerField[] perFields;
- int numVectorFields;
+ void reset() {
+ numVectorFields = 0;
+ perFields = new TermVectorsTermsWriterPerField[1];
+ }
- int[] fieldNumbers = new int[1];
- long[] fieldPointers = new long[1];
+ @Override
+ public TermsHashConsumerPerField addField(TermsHashPerField termsHashPerField, FieldInfo fieldInfo) {
+ return new TermVectorsTermsWriterPerField(termsHashPerField, this, fieldInfo);
+ }
- void reset() {
- perDocTvf.reset();
- buffer.recycle();
- numVectorFields = 0;
+ void addFieldToFlush(TermVectorsTermsWriterPerField fieldToFlush) {
+ if (numVectorFields == perFields.length) {
+ int newSize = ArrayUtil.oversize(numVectorFields + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
+ TermVectorsTermsWriterPerField[] newArray = new TermVectorsTermsWriterPerField[newSize];
+ System.arraycopy(perFields, 0, newArray, 0, numVectorFields);
+ perFields = newArray;
}
- @Override
- void abort() {
- reset();
- free(this);
- }
+ perFields[numVectorFields++] = fieldToFlush;
+ }
- void addField(final int fieldNumber) {
- if (numVectorFields == fieldNumbers.length) {
- fieldNumbers = ArrayUtil.grow(fieldNumbers);
- }
- if (numVectorFields == fieldPointers.length) {
- fieldPointers = ArrayUtil.grow(fieldPointers);
- }
- fieldNumbers[numVectorFields] = fieldNumber;
- fieldPointers[numVectorFields] = perDocTvf.getFilePointer();
- numVectorFields++;
- }
+ @Override
+ void startDocument() throws IOException {
+ assert clearLastVectorFieldName();
+ reset();
+ }
- @Override
- public long sizeInBytes() {
- return buffer.getSizeInBytes();
- }
+ // Called only by assert
+ final boolean clearLastVectorFieldName() {
+ lastVectorFieldName = null;
+ return true;
+ }
- @Override
- public void finish() throws IOException {
- finishDocument(this);
+ // Called only by assert
+ String lastVectorFieldName;
+ final boolean vectorFieldsInOrder(FieldInfo fi) {
+ try {
+ if (lastVectorFieldName != null)
+ return lastVectorFieldName.compareTo(fi.name) < 0;
+ else
+ return true;
+ } finally {
+ lastVectorFieldName = fi.name;
}
}
+
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java Mon May 9 15:24:04 2011
@@ -28,11 +28,10 @@ import org.apache.lucene.util.RamUsageEs
final class TermVectorsTermsWriterPerField extends TermsHashConsumerPerField {
- final TermVectorsTermsWriterPerThread perThread;
final TermsHashPerField termsHashPerField;
final TermVectorsTermsWriter termsWriter;
final FieldInfo fieldInfo;
- final DocumentsWriter.DocState docState;
+ final DocumentsWriterPerThread.DocState docState;
final FieldInvertState fieldState;
boolean doVectors;
@@ -41,11 +40,10 @@ final class TermVectorsTermsWriterPerFie
int maxNumPostings;
OffsetAttribute offsetAttribute = null;
-
- public TermVectorsTermsWriterPerField(TermsHashPerField termsHashPerField, TermVectorsTermsWriterPerThread perThread, FieldInfo fieldInfo) {
+
+ public TermVectorsTermsWriterPerField(TermsHashPerField termsHashPerField, TermVectorsTermsWriter termsWriter, FieldInfo fieldInfo) {
this.termsHashPerField = termsHashPerField;
- this.perThread = perThread;
- this.termsWriter = perThread.termsWriter;
+ this.termsWriter = termsWriter;
this.fieldInfo = fieldInfo;
docState = termsHashPerField.docState;
fieldState = termsHashPerField.fieldState;
@@ -72,22 +70,12 @@ final class TermVectorsTermsWriterPerFie
}
if (doVectors) {
- if (perThread.doc == null) {
- perThread.doc = termsWriter.getPerDoc();
- perThread.doc.docID = docState.docID;
- assert perThread.doc.numVectorFields == 0;
- assert 0 == perThread.doc.perDocTvf.length();
- assert 0 == perThread.doc.perDocTvf.getFilePointer();
- }
-
- assert perThread.doc.docID == docState.docID;
-
+ termsWriter.hasVectors = true;
if (termsHashPerField.bytesHash.size() != 0) {
// Only necessary if previous doc hit a
// non-aborting exception while writing vectors in
// this field:
termsHashPerField.reset();
- perThread.termsHashPerThread.reset(false);
}
}
@@ -95,42 +83,42 @@ final class TermVectorsTermsWriterPerFie
//perThread.postingsCount = 0;
return doVectors;
- }
+ }
public void abort() {}
/** Called once per field per document if term vectors
* are enabled, to write the vectors to
* RAMOutputStream, which is then quickly flushed to
- * the real term vectors files in the Directory. */
- @Override
+ * the real term vectors files in the Directory. */ @Override
void finish() throws IOException {
+ if (!doVectors || termsHashPerField.bytesHash.size() == 0)
+ return;
+ termsWriter.addFieldToFlush(this);
+ }
+
+ void finishDocument() throws IOException {
assert docState.testPoint("TermVectorsTermsWriterPerField.finish start");
final int numPostings = termsHashPerField.bytesHash.size();
- final BytesRef flushTerm = perThread.flushTerm;
+ final BytesRef flushTerm = termsWriter.flushTerm;
assert numPostings >= 0;
- if (!doVectors || numPostings == 0)
- return;
-
if (numPostings > maxNumPostings)
maxNumPostings = numPostings;
- final IndexOutput tvf = perThread.doc.perDocTvf;
-
// This is called once, after inverting all occurrences
// of a given field in the doc. At this point we flush
// our hash into the DocWriter.
assert fieldInfo.storeTermVector;
- assert perThread.vectorFieldsInOrder(fieldInfo);
+ assert termsWriter.vectorFieldsInOrder(fieldInfo);
- perThread.doc.addField(termsHashPerField.fieldInfo.number);
TermVectorsPostingsArray postings = (TermVectorsPostingsArray) termsHashPerField.postingsArray;
+ final IndexOutput tvf = termsWriter.tvf;
// TODO: we may want to make this sort in same order
// as Codec's terms dict?
@@ -140,21 +128,21 @@ final class TermVectorsTermsWriterPerFie
byte bits = 0x0;
if (doVectorPositions)
bits |= TermVectorsReader.STORE_POSITIONS_WITH_TERMVECTOR;
- if (doVectorOffsets)
+ if (doVectorOffsets)
bits |= TermVectorsReader.STORE_OFFSET_WITH_TERMVECTOR;
tvf.writeByte(bits);
int lastLen = 0;
byte[] lastBytes = null;
int lastStart = 0;
-
- final ByteSliceReader reader = perThread.vectorSliceReader;
- final ByteBlockPool termBytePool = perThread.termsHashPerThread.termBytePool;
+
+ final ByteSliceReader reader = termsWriter.vectorSliceReader;
+ final ByteBlockPool termBytePool = termsHashPerField.termBytePool;
for(int j=0;j<numPostings;j++) {
final int termID = termIDs[j];
final int freq = postings.freqs[termID];
-
+
// Get BytesRef
termBytePool.setBytesRef(flushTerm, postings.textStarts[termID]);
@@ -192,20 +180,13 @@ final class TermVectorsTermsWriterPerFie
}
termsHashPerField.reset();
-
- // NOTE: we clear, per-field, at the thread level,
- // because term vectors fully write themselves on each
- // field; this saves RAM (eg if large doc has two large
- // fields w/ term vectors on) because we recycle/reuse
- // all RAM after each field:
- perThread.termsHashPerThread.reset(false);
}
void shrinkHash() {
termsHashPerField.shrinkHash(maxNumPostings);
maxNumPostings = 0;
}
-
+
@Override
void start(Fieldable f) {
if (doVectorOffsets) {
@@ -225,7 +206,7 @@ final class TermVectorsTermsWriterPerFie
if (doVectorOffsets) {
int startOffset = fieldState.offset + offsetAttribute.startOffset();
int endOffset = fieldState.offset + offsetAttribute.endOffset();
-
+
termsHashPerField.writeVInt(1, startOffset);
termsHashPerField.writeVInt(1, endOffset - startOffset);
postings.lastOffsets[termID] = endOffset;
@@ -243,13 +224,13 @@ final class TermVectorsTermsWriterPerFie
assert docState.testPoint("TermVectorsTermsWriterPerField.addTerm start");
TermVectorsPostingsArray postings = (TermVectorsPostingsArray) termsHashPerField.postingsArray;
-
+
postings.freqs[termID]++;
if (doVectorOffsets) {
int startOffset = fieldState.offset + offsetAttribute.startOffset();
int endOffset = fieldState.offset + offsetAttribute.endOffset();
-
+
termsHashPerField.writeVInt(1, startOffset - postings.lastOffsets[termID]);
termsHashPerField.writeVInt(1, endOffset - startOffset);
postings.lastOffsets[termID] = endOffset;
@@ -280,7 +261,7 @@ final class TermVectorsTermsWriterPerFie
int[] freqs; // How many times this term occurred in the current doc
int[] lastOffsets; // Last offset we saw
int[] lastPositions; // Last position where this term occurred
-
+
@Override
ParallelPostingsArray newInstance(int size) {
return new TermVectorsPostingsArray(size);
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsWriter.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsWriter.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/TermVectorsWriter.java Mon May 9 15:24:04 2011
@@ -20,12 +20,13 @@ package org.apache.lucene.index;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.StringHelper;
import java.io.IOException;
final class TermVectorsWriter {
-
+
private IndexOutput tvx = null, tvd = null, tvf = null;
private FieldInfos fieldInfos;
@@ -46,7 +47,7 @@ final class TermVectorsWriter {
/**
* Add a complete document specified by all its term vectors. If document has no
* term vectors, add value for tvx.
- *
+ *
* @param vectors
* @throws IOException
*/
@@ -99,7 +100,7 @@ final class TermVectorsWriter {
final int[] freqs = vectors[i].getTermFrequencies();
for (int j=0; j<numTerms; j++) {
-
+
int start = j == 0 ? 0 : StringHelper.bytesDifference(terms[j-1].bytes,
terms[j-1].length,
terms[j].bytes,
@@ -181,30 +182,11 @@ final class TermVectorsWriter {
assert tvd.getFilePointer() == tvdPosition;
assert tvf.getFilePointer() == tvfPosition;
}
-
+
/** Close all streams. */
final void close() throws IOException {
// make an effort to close all streams we can but remember and re-throw
// the first exception encountered in this process
- IOException keep = null;
- if (tvx != null)
- try {
- tvx.close();
- } catch (IOException e) {
- keep = e;
- }
- if (tvd != null)
- try {
- tvd.close();
- } catch (IOException e) {
- if (keep == null) keep = e;
- }
- if (tvf != null)
- try {
- tvf.close();
- } catch (IOException e) {
- if (keep == null) keep = e;
- }
- if (keep != null) throw (IOException) keep.fillInStackTrace();
+ IOUtils.closeSafely(tvx, tvd, tvf);
}
}