You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2011/01/13 09:49:22 UTC
svn commit: r1058461 -
/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/
Author: buschmi
Date: Thu Jan 13 08:49:21 2011
New Revision: 1058461
URL: http://svn.apache.org/viewvc?rev=1058461&view=rev
Log:
LUCENE-2324: More progress with concurrency and per-thread pool
Added:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
Removed:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
Modified:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1058461&r1=1058460&r2=1058461&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Thu Jan 13 08:49:21 2011
@@ -20,6 +20,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,6 +29,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
@@ -107,7 +109,7 @@ final class DocumentsWriter {
int numDocsInStore; // # docs written to doc stores
boolean bufferIsFull; // True when it's time to write segment
- private boolean closed;
+ private volatile boolean closed;
PrintStream infoStream;
int maxFieldLength = IndexWriterConfig.UNLIMITED_FIELD_LENGTH;
@@ -115,12 +117,10 @@ final class DocumentsWriter {
List<String> newFiles;
- private final DocumentsWriterThreadPool threadPool;
final IndexWriter indexWriter;
private AtomicInteger numDocsInRAM = new AtomicInteger(0);
private AtomicLong ramUsed = new AtomicLong(0);
- private int numDocumentsWriterPerThreads;
static class DocState {
DocumentsWriter docWriter;
@@ -160,54 +160,71 @@ final class DocumentsWriter {
private final FieldInfos fieldInfos;
final BufferedDeletes bufferedDeletes;
+ final SegmentDeletes pendingDeletes;
private final IndexWriter.FlushControl flushControl;
- private final IndexingChain chain;
+ final IndexingChain chain;
- DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain chain, DocumentsWriterThreadPool indexerThreadPool, FieldInfos fieldInfos, BufferedDeletes bufferedDeletes) throws IOException {
+ final DocumentsWriterPerThreadPool perThreadPool;
+
+ DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain chain, DocumentsWriterPerThreadPool indexerThreadPool, FieldInfos fieldInfos, BufferedDeletes bufferedDeletes) throws IOException {
this.directory = directory;
this.indexWriter = writer;
this.similarity = writer.getConfig().getSimilarity();
this.fieldInfos = fieldInfos;
this.bufferedDeletes = bufferedDeletes;
- this.threadPool = indexerThreadPool;
+ this.perThreadPool = indexerThreadPool;
+ this.pendingDeletes = new SegmentDeletes();
this.chain = chain;
flushControl = writer.flushControl;
+ this.perThreadPool.initialize(this);
}
- boolean deleteQueries(Query... queries) {
- final boolean doFlush = flushControl.waitUpdate(0, queries.length);
- Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
- while (it.hasNext()) {
- it.next().deleteQueries(queries);
+ boolean deleteQueries(final Query... queries) throws IOException {
+ Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+
+ boolean added = false;
+ while (threadsIterator.hasNext()) {
+ threadsIterator.next().perThread.deleteQueries(queries);
+ added = true;
}
- return doFlush;
- }
- boolean deleteQuery(Query query) {
- final boolean doFlush = flushControl.waitUpdate(0, 1);
- Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
- while (it.hasNext()) {
- it.next().deleteQuery(query);
+ if (!added) {
+ synchronized(this) {
+ for (Query query : queries) {
+ pendingDeletes.addQuery(query, SegmentDeletes.MAX_INT);
+ }
+ }
}
- return doFlush;
+
+ return true;
}
- boolean deleteTerms(Term... terms) {
- final boolean doFlush = flushControl.waitUpdate(0, terms.length);
- Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
- while (it.hasNext()) {
- it.next().deleteTerms(terms);
- }
- return doFlush;
+ boolean deleteQuery(final Query query) throws IOException {
+ return deleteQueries(query);
}
- boolean deleteTerm(Term term, boolean skipWait) {
- final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
- Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
- while (it.hasNext()) {
- it.next().deleteTerm(term);
+ boolean deleteTerms(final Term... terms) throws IOException {
+ Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+
+ boolean added = false;
+ while (threadsIterator.hasNext()) {
+ threadsIterator.next().perThread.deleteTerms(terms);
+ added = true;
}
- return doFlush;
+
+ if (!added) {
+ synchronized(this) {
+ for (Term term : terms) {
+ pendingDeletes.addTerm(term, SegmentDeletes.MAX_INT);
+ }
+ }
+ }
+
+ return false;
+ }
+
+ boolean deleteTerm(final Term term) throws IOException {
+ return deleteTerms(term);
}
public FieldInfos getFieldInfos() {
@@ -224,25 +241,26 @@ final class DocumentsWriter {
* here. */
synchronized void setInfoStream(PrintStream infoStream) {
this.infoStream = infoStream;
- Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
- while (it.hasNext()) {
- it.next().docState.infoStream = infoStream;
- }
+ pushConfigChange();
}
synchronized void setMaxFieldLength(int maxFieldLength) {
this.maxFieldLength = maxFieldLength;
- Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
- while (it.hasNext()) {
- it.next().docState.maxFieldLength = maxFieldLength;
- }
+ pushConfigChange();
}
synchronized void setSimilarity(Similarity similarity) {
this.similarity = similarity;
- Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
+ pushConfigChange();
+ }
+
+ private final void pushConfigChange() {
+ Iterator<ThreadState> it = perThreadPool.getAllPerThreadsIterator();
while (it.hasNext()) {
- it.next().docState.similarity = similarity;
+ DocumentsWriterPerThread perThread = it.next().perThread;
+ perThread.docState.infoStream = this.infoStream;
+ perThread.docState.maxFieldLength = this.maxFieldLength;
+ perThread.docState.similarity = this.similarity;
}
}
@@ -300,14 +318,25 @@ final class DocumentsWriter {
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
synchronized void abort() throws IOException {
- if (infoStream != null) {
- message("docWriter: abort");
- }
-
boolean success = false;
+
try {
+ if (infoStream != null) {
+ message("docWriter: abort");
+ }
+
+ Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+
+ while (threadsIterator.hasNext()) {
+ ThreadState perThread = threadsIterator.next();
+ perThread.lock();
+ try {
+ perThread.perThread.abort();
+ } finally {
+ perThread.unlock();
+ }
+ }
- threadPool.abort();
success = true;
} finally {
notifyAll();
@@ -324,16 +353,12 @@ final class DocumentsWriter {
}
// for testing
- public SegmentDeletes getPendingDeletes() {
- return null;
- // nocommit
- //return pendingDeletes;
+ public synchronized SegmentDeletes getPendingDeletes() {
+ return pendingDeletes;
}
- public boolean anyDeletions() {
- // nocommit
- return true;
- //return pendingDeletes.any();
+ public synchronized boolean anyDeletions() {
+ return pendingDeletes.any();
}
synchronized void close() {
@@ -341,52 +366,54 @@ final class DocumentsWriter {
notifyAll();
}
- synchronized DocumentsWriterPerThread newDocumentsWriterPerThread() {
- DocumentsWriterPerThread perThread = new DocumentsWriterPerThread(directory, this, chain);
- numDocumentsWriterPerThreads++;
- return perThread;
- }
-
boolean updateDocument(final Document doc, final Analyzer analyzer, final Term delTerm)
throws CorruptIndexException, IOException {
+ ensureOpen();
- boolean flushed = threadPool.executePerThread(this, doc,
- new DocumentsWriterThreadPool.PerThreadTask<Boolean>() {
- @Override
- public Boolean process(final DocumentsWriterPerThread perThread) throws IOException {
- long perThreadRAMUsedBeforeAdd = perThread.bytesUsed();
- perThread.addDocument(doc, analyzer);
-
- synchronized(DocumentsWriter.this) {
- ensureOpen();
- if (delTerm != null) {
- deleteTerm(delTerm, true);
- }
- perThread.commitDocument();
- numDocsInRAM.incrementAndGet();
- }
-
- if (finishAddDocument(perThread, perThreadRAMUsedBeforeAdd)) {
- super.clearThreadBindings();
- return true;
- }
- return false;
- }
- });
+ Collection<String> flushedFiles = null;
+ SegmentInfo newSegment = null;
+
+ ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
+ try {
+ DocumentsWriterPerThread dwpt = perThread.perThread;
+ long perThreadRAMUsedBeforeAdd = dwpt.bytesUsed();
+ dwpt.addDocument(doc, analyzer);
+
+ synchronized(DocumentsWriter.this) {
+ if (delTerm != null) {
+ deleteTerm(delTerm);
+ }
+ dwpt.commitDocument();
+ numDocsInRAM.incrementAndGet();
+ }
+
+ newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
+ if (newSegment != null) {
+ perThreadPool.clearThreadBindings(perThread);
+ flushedFiles = new HashSet<String>();
+ flushedFiles.addAll(dwpt.flushState.flushedFiles);
+ }
- if (flushed) {
- indexWriter.maybeMerge();
+ } finally {
+ perThread.unlock();
+ }
+
+ if (newSegment != null) {
+ finishFlushedSegment(newSegment, flushedFiles);
return true;
}
return false;
}
- private final boolean finishAddDocument(DocumentsWriterPerThread perThread,
+ private final SegmentInfo finishAddDocument(DocumentsWriterPerThread perThread,
long perThreadRAMUsedBeforeAdd) throws IOException {
+ SegmentInfo newSegment = null;
+
int numDocsPerThread = perThread.getNumDocsInRAM();
- boolean flushed = maybeFlushPerThread(perThread);
- if (flushed) {
+ if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
+ newSegment = perThread.flush();
+
int oldValue = numDocsInRAM.get();
while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numDocsPerThread)) {
oldValue = numDocsInRAM.get();
@@ -399,81 +426,73 @@ final class DocumentsWriter {
oldValue = ramUsed.get();
}
- return flushed;
+ return newSegment;
}
- private boolean flushSegment(DocumentsWriterPerThread perThread) throws IOException {
- if (perThread.getNumDocsInRAM() == 0) {
- return false;
- }
+ final boolean flushAllThreads(final boolean flushDeletes)
+ throws IOException {
- SegmentInfo newSegment = perThread.flush();
- newSegment.dir = indexWriter.getDirectory();
+ if (flushDeletes) {
+ if (indexWriter.segmentInfos.size() > 0 && pendingDeletes.any()) {
+ bufferedDeletes.pushDeletes(pendingDeletes, indexWriter.segmentInfos.lastElement(), true);
+ pendingDeletes.clear();
+ }
+ }
- finishFlushedSegment(newSegment, perThread);
- return true;
- }
+ Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+ boolean anythingFlushed = false;
- private final boolean maybeFlushPerThread(DocumentsWriterPerThread perThread) throws IOException {
- if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
- flushSegment(perThread);
- assert perThread.getNumDocsInRAM() == 0;
+ while (threadsIterator.hasNext()) {
+ Collection<String> flushedFiles = null;
+ SegmentInfo newSegment = null;
- return true;
- }
+ ThreadState perThread = threadsIterator.next();
+ perThread.lock();
+ try {
+ DocumentsWriterPerThread dwpt = perThread.perThread;
+ final int numDocs = dwpt.getNumDocsInRAM();
- return false;
- }
+ // Always flush docs if there are any
+ boolean flushDocs = numDocs > 0;
- final boolean flushAllThreads(final boolean flushDeletes)
- throws IOException {
+ String segment = dwpt.getSegment();
- return threadPool.executeAllThreads(this, new DocumentsWriterThreadPool.AllThreadsTask<Boolean>() {
- @Override
- public Boolean process(Iterator<DocumentsWriterPerThread> threadsIterator) throws IOException {
- boolean anythingFlushed = false;
-
- while (threadsIterator.hasNext()) {
- DocumentsWriterPerThread perThread = threadsIterator.next();
- final int numDocs = perThread.getNumDocsInRAM();
-
- // Always flush docs if there are any
- boolean flushDocs = numDocs > 0;
-
- String segment = perThread.getSegment();
-
- // If we are flushing docs, segment must not be null:
- assert segment != null || !flushDocs;
-
- if (flushDocs) {
- SegmentInfo newSegment = perThread.flush();
- newSegment.dir = indexWriter.getDirectory();
-
- if (newSegment != null) {
- anythingFlushed = true;
-
- IndexWriter.setDiagnostics(newSegment, "flush");
- finishFlushedSegment(newSegment, perThread);
- }
- } else if (flushDeletes) {
- perThread.pushDeletes(null, indexWriter.segmentInfos);
+ // If we are flushing docs, segment must not be null:
+ assert segment != null || !flushDocs;
+
+ if (flushDocs) {
+ newSegment = dwpt.flush();
+
+ if (newSegment != null) {
+ IndexWriter.setDiagnostics(newSegment, "flush");
+ flushedFiles = new HashSet<String>();
+ flushedFiles.addAll(dwpt.flushState.flushedFiles);
+ dwpt.pushDeletes(newSegment, indexWriter.segmentInfos);
+ anythingFlushed = true;
+ perThreadPool.clearThreadBindings(perThread);
}
+ } else if (flushDeletes) {
+ dwpt.pushDeletes(null, indexWriter.segmentInfos);
}
+ } finally {
+ perThread.unlock();
+ }
- if (anythingFlushed) {
- clearThreadBindings();
- numDocsInRAM.set(0);
- }
-
- return anythingFlushed;
+ if (newSegment != null) {
+ // important do unlock the perThread before finishFlushedSegment
+ // is called to prevent deadlock on IndexWriter mutex
+ finishFlushedSegment(newSegment, flushedFiles);
}
- });
+ }
+
+ numDocsInRAM.set(0);
+ return anythingFlushed;
}
/** Build compound file for the segment we just flushed */
- void createCompoundFile(String compoundFileName, DocumentsWriterPerThread perThread) throws IOException {
+ void createCompoundFile(String compoundFileName, Collection<String> flushedFiles) throws IOException {
CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
- for(String fileName : perThread.flushState.flushedFiles) {
+ for(String fileName : flushedFiles) {
cfsWriter.addFile(fileName);
}
@@ -481,16 +500,14 @@ final class DocumentsWriter {
cfsWriter.close();
}
- void finishFlushedSegment(SegmentInfo newSegment, DocumentsWriterPerThread perThread) throws IOException {
- perThread.pushDeletes(newSegment, indexWriter.segmentInfos);
-
+ void finishFlushedSegment(SegmentInfo newSegment, Collection<String> flushedFiles) throws IOException {
if (indexWriter.useCompoundFile(newSegment)) {
String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
message("creating compound file " + compoundFileName);
// Now build compound file
boolean success = false;
try {
- createCompoundFile(compoundFileName, perThread);
+ createCompoundFile(compoundFileName, flushedFiles);
success = true;
} finally {
if (!success) {
@@ -501,14 +518,14 @@ final class DocumentsWriter {
indexWriter.deleter.deleteFile(IndexFileNames.segmentFileName(newSegment.name, "",
IndexFileNames.COMPOUND_FILE_EXTENSION));
- for (String file : perThread.flushState.flushedFiles) {
+ for (String file : flushedFiles) {
indexWriter.deleter.deleteFile(file);
}
}
}
- for (String file : perThread.flushState.flushedFiles) {
+ for (String file : flushedFiles) {
indexWriter.deleter.deleteFile(file);
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1058461&r1=1058460&r2=1058461&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Thu Jan 13 08:49:21 2011
@@ -109,7 +109,7 @@ public class DocumentsWriterPerThread {
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
void abort() throws IOException {
- assert aborting;
+ aborting = true;
try {
if (infoStream != null) {
message("docWriter: now abort");
@@ -152,7 +152,6 @@ public class DocumentsWriterPerThread {
FieldInfos fieldInfos = new FieldInfos();
public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
- parent.indexWriter.testPoint("DocumentsWriterPerThread.init start");
this.directory = directory;
this.parent = parent;
this.writer = parent.indexWriter;
@@ -191,7 +190,7 @@ public class DocumentsWriterPerThread {
if (!aborting) {
// mark document as deleted
deleteDocID(docState.docID);
- commitDocument();
+ numDocsInRAM++;
}
}
}
@@ -203,7 +202,7 @@ public class DocumentsWriterPerThread {
success = true;
} finally {
if (!success) {
- setAborting();
+ abort();
}
}
}
@@ -249,23 +248,23 @@ public class DocumentsWriterPerThread {
// confounding exception).
}
- void deleteQueries(Query... queries) {
+ synchronized void deleteQueries(Query... queries) {
for (Query query : queries) {
pendingDeletes.addQuery(query, numDocsInRAM);
}
}
- void deleteQuery(Query query) {
+ synchronized void deleteQuery(Query query) {
pendingDeletes.addQuery(query, numDocsInRAM);
}
- void deleteTerms(Term... terms) {
+ synchronized void deleteTerms(Term... terms) {
for (Term term : terms) {
pendingDeletes.addTerm(term, numDocsInRAM);
}
}
- void deleteTerm(Term term) {
+ synchronized void deleteTerm(Term term) {
pendingDeletes.addTerm(term, numDocsInRAM);
}
@@ -350,7 +349,7 @@ public class DocumentsWriterPerThread {
return newSegment;
} finally {
if (!success) {
- setAborting();
+ abort();
}
}
}
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1058461&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Thu Jan 13 08:49:21 2011
@@ -0,0 +1,81 @@
+package org.apache.lucene.index;
+
+import java.util.Iterator;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.document.Document;
+
+public abstract class DocumentsWriterPerThreadPool {
+ final static class ThreadState extends ReentrantLock {
+ final DocumentsWriterPerThread perThread;
+
+ ThreadState(DocumentsWriterPerThread perThread) {
+ this.perThread = perThread;
+ }
+ }
+
+ private final ThreadState[] perThreads;
+ private volatile int numThreadStatesActive;
+
+ public DocumentsWriterPerThreadPool(int maxNumPerThreads) {
+ maxNumPerThreads = (maxNumPerThreads < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumPerThreads;
+ this.perThreads = new ThreadState[maxNumPerThreads];
+
+ numThreadStatesActive = 0;
+ }
+
+ public void initialize(DocumentsWriter documentsWriter) {
+ for (int i = 0; i < perThreads.length; i++) {
+ perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, documentsWriter.chain));
+ }
+ }
+
+ public int getMaxThreadStates() {
+ return perThreads.length;
+ }
+
+ public ThreadState newThreadState() {
+ if (numThreadStatesActive < perThreads.length) {
+ ThreadState state = perThreads[numThreadStatesActive];
+ numThreadStatesActive++;
+ return state;
+ }
+
+ return null;
+ }
+
+ public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
+
+ public abstract void clearThreadBindings(ThreadState perThread);
+
+ public abstract void clearAllThreadBindings();
+
+ public Iterator<ThreadState> getAllPerThreadsIterator() {
+ return getPerThreadsIterator(this.perThreads.length);
+ }
+
+ public Iterator<ThreadState> getActivePerThreadsIterator() {
+ return getPerThreadsIterator(this.numThreadStatesActive);
+ }
+
+ private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
+ return new Iterator<ThreadState>() {
+ int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return i < upto;
+ }
+
+ @Override
+ public ThreadState next() {
+ return perThreads[i++];
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() not supported.");
+ }
+ };
+ }
+}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1058461&r1=1058460&r2=1058461&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java Thu Jan 13 08:49:21 2011
@@ -1275,7 +1275,7 @@ public class IndexWriter implements Clos
public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
ensureOpen();
try {
- if (docWriter.deleteTerm(term, false)) {
+ if (docWriter.deleteTerm(term)) {
flush(true, false);
}
} catch (OutOfMemoryError oom) {
@@ -1396,10 +1396,11 @@ public class IndexWriter implements Clos
public void updateDocument(Term term, Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
ensureOpen();
+ boolean maybeMerge = false;
try {
boolean success = false;
try {
- docWriter.updateDocument(doc, analyzer, term);
+ maybeMerge = docWriter.updateDocument(doc, analyzer, term);
success = true;
} finally {
if (!success && infoStream != null)
@@ -1408,6 +1409,10 @@ public class IndexWriter implements Clos
} catch (OutOfMemoryError oom) {
handleOOM(oom, "updateDocument");
}
+
+ if (maybeMerge) {
+ maybeMerge();
+ }
}
// for test purpose
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1058461&r1=1058460&r2=1058461&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java Thu Jan 13 08:49:21 2011
@@ -31,12 +31,12 @@ import org.apache.lucene.util.Version;
* <p>
* All setter methods return {@link IndexWriterConfig} to allow chaining
* settings conveniently. Thus someone can do:
- *
+ *
* <pre>
* IndexWriterConfig conf = new IndexWriterConfig(analyzer);
* conf.setter1().setter2();
* </pre>
- *
+ *
* @since 3.1
*/
public final class IndexWriterConfig implements Cloneable {
@@ -53,7 +53,7 @@ public final class IndexWriterConfig imp
* </ul>
*/
public static enum OpenMode { CREATE, APPEND, CREATE_OR_APPEND }
-
+
/** Default value is 32. Change using {@link #setTermIndexInterval(int)}. */
public static final int DEFAULT_TERM_INDEX_INTERVAL = 32;
@@ -74,7 +74,7 @@ public final class IndexWriterConfig imp
/**
* Default value for the write lock timeout (1,000 ms).
- *
+ *
* @see #setDefaultWriteLockTimeout(long)
*/
public static long WRITE_LOCK_TIMEOUT = 1000;
@@ -102,7 +102,7 @@ public final class IndexWriterConfig imp
/**
* Returns the default write lock timeout for newly instantiated
* IndexWriterConfigs.
- *
+ *
* @see #setDefaultWriteLockTimeout(long)
*/
public static long getDefaultWriteLockTimeout() {
@@ -126,9 +126,9 @@ public final class IndexWriterConfig imp
private CodecProvider codecProvider;
private MergePolicy mergePolicy;
private boolean readerPooling;
- private DocumentsWriterThreadPool indexerThreadPool;
+ private DocumentsWriterPerThreadPool indexerThreadPool;
private int readerTermsIndexDivisor;
-
+
// required for clone
private Version matchVersion;
@@ -161,7 +161,7 @@ public final class IndexWriterConfig imp
indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(DEFAULT_MAX_THREAD_STATES);
readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
}
-
+
@Override
public Object clone() {
// Shallow clone is the only thing that's possible, since parameters like
@@ -184,7 +184,7 @@ public final class IndexWriterConfig imp
this.openMode = openMode;
return this;
}
-
+
/** Returns the {@link OpenMode} set by {@link #setOpenMode(OpenMode)}. */
public OpenMode getOpenMode() {
return openMode;
@@ -243,7 +243,7 @@ public final class IndexWriterConfig imp
/**
* Returns the maximum number of terms that will be indexed for a single field
* in a document.
- *
+ *
* @see #setMaxFieldLength(int)
*/
public int getMaxFieldLength() {
@@ -273,7 +273,7 @@ public final class IndexWriterConfig imp
* <p>
* <b>NOTE:</b> the similarity cannot be null. If <code>null</code> is passed,
* the similarity will be set to the default.
- *
+ *
* @see Similarity#setDefault(Similarity)
*/
public IndexWriterConfig setSimilarity(Similarity similarity) {
@@ -289,7 +289,7 @@ public final class IndexWriterConfig imp
public Similarity getSimilarity() {
return similarity;
}
-
+
/**
* Expert: set the interval between indexed terms. Large values cause less
* memory to be used by IndexReader, but slow random-access to terms. Small
@@ -309,7 +309,7 @@ public final class IndexWriterConfig imp
* In particular, <code>numUniqueTerms/interval</code> terms are read into
* memory by an IndexReader, and, on average, <code>interval/2</code> terms
* must be scanned for each random term access.
- *
+ *
* @see #DEFAULT_TERM_INDEX_INTERVAL
*/
public IndexWriterConfig setTermIndexInterval(int interval) {
@@ -319,7 +319,7 @@ public final class IndexWriterConfig imp
/**
* Returns the interval between indexed terms.
- *
+ *
* @see #setTermIndexInterval(int)
*/
public int getTermIndexInterval() {
@@ -355,10 +355,10 @@ public final class IndexWriterConfig imp
this.writeLockTimeout = writeLockTimeout;
return this;
}
-
+
/**
* Returns allowed timeout when acquiring the write lock.
- *
+ *
* @see #setWriteLockTimeout(long)
*/
public long getWriteLockTimeout() {
@@ -372,7 +372,7 @@ public final class IndexWriterConfig imp
* created.
* <p>Disabled by default (writer flushes by RAM usage).
- *
+ *
* @throws IllegalArgumentException if maxBufferedDeleteTerms
* is enabled but smaller than 1
* @see #setRAMBufferSizeMB
@@ -389,7 +389,7 @@ public final class IndexWriterConfig imp
/**
* Returns the number of buffered deleted terms that will trigger a flush if
* enabled.
- *
+ *
* @see #setMaxBufferedDeleteTerms(int)
*/
public int getMaxBufferedDeleteTerms() {
@@ -401,33 +401,33 @@ public final class IndexWriterConfig imp
* and deletions before they are flushed to the Directory. Generally for
* faster indexing performance it's best to flush by RAM usage instead of
* document count and use as large a RAM buffer as you can.
- *
+ *
* <p>
* When this is set, the writer will flush whenever buffered documents and
* deletions use this much RAM. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent
* triggering a flush due to RAM usage. Note that if flushing by document
* count is also enabled, then the flush will be triggered by whichever comes
* first.
- *
+ *
* <p>
* <b>NOTE</b>: the account of RAM usage for pending deletions is only
* approximate. Specifically, if you delete by Query, Lucene currently has no
* way to measure the RAM usage of individual Queries so the accounting will
* under-estimate and you should compensate by either calling commit()
* periodically yourself, or by using {@link #setMaxBufferedDeleteTerms(int)}
- * to flush by count instead of RAM usage (each buffered delete Query counts
+ * to flush by count instead of RAM usage (each buffered delete Query counts
* as one).
- *
+ *
* <p>
* <b>NOTE</b>: because IndexWriter uses <code>int</code>s when managing its
* internal storage, the absolute maximum value for this setting is somewhat
* less than 2048 MB. The precise limit depends on various factors, such as
* how large your documents are, how many fields have norms, etc., so it's
* best to set this value comfortably under 2048.
- *
+ *
* <p>
* The default value is {@link #DEFAULT_RAM_BUFFER_SIZE_MB}.
- *
+ *
* @throws IllegalArgumentException
* if ramBufferSize is enabled but non-positive, or it disables
* ramBufferSize when maxBufferedDocs is already disabled
@@ -456,19 +456,19 @@ public final class IndexWriterConfig imp
* Determines the minimal number of documents required before the buffered
* in-memory documents are flushed as a new Segment. Large values generally
* give faster indexing.
- *
+ *
* <p>
* When this is set, the writer will flush every maxBufferedDocs added
* documents. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent triggering a
* flush due to number of buffered documents. Note that if flushing by RAM
* usage is also enabled, then the flush will be triggered by whichever comes
* first.
- *
+ *
* <p>
* Disabled by default (writer flushes by RAM usage).
- *
+ *
* @see #setRAMBufferSizeMB(double)
- *
+ *
* @throws IllegalArgumentException
* if maxBufferedDocs is enabled but smaller than 2, or it disables
* maxBufferedDocs when ramBufferSize is already disabled
@@ -488,7 +488,7 @@ public final class IndexWriterConfig imp
/**
* Returns the number of buffered added documents that will trigger a flush if
* enabled.
- *
+ *
* @see #setMaxBufferedDocs(int)
*/
public int getMaxBufferedDocs() {
@@ -529,10 +529,10 @@ public final class IndexWriterConfig imp
return codecProvider;
}
-
+
/**
* Returns the current MergePolicy in use by this writer.
- *
+ *
* @see #setMergePolicy(MergePolicy)
*/
public MergePolicy getMergePolicy() {
@@ -545,15 +545,15 @@ public final class IndexWriterConfig imp
* <code>maxThreadStates</code> will be set to
* {@link #DEFAULT_MAX_THREAD_STATES}.
*/
- public IndexWriterConfig setIndexerThreadPool(DocumentsWriterThreadPool threadPool) {
+ public IndexWriterConfig setIndexerThreadPool(DocumentsWriterPerThreadPool threadPool) {
this.indexerThreadPool = threadPool;
return this;
}
- public DocumentsWriterThreadPool getIndexerThreadPool() {
+ public DocumentsWriterPerThreadPool getIndexerThreadPool() {
return this.indexerThreadPool;
}
-
+
/** Returns the max number of simultaneous threads that
* may be indexing documents at once in IndexWriter. */
public int getMaxThreadStates() {
@@ -584,7 +584,7 @@ public final class IndexWriterConfig imp
this.indexingChain = indexingChain == null ? DocumentsWriterPerThread.defaultIndexingChain : indexingChain;
return this;
}
-
+
/** Returns the indexing chain set on {@link #setIndexingChain(IndexingChain)}. */
IndexingChain getIndexingChain() {
return indexingChain;
@@ -606,7 +606,7 @@ public final class IndexWriterConfig imp
public int getReaderTermsIndexDivisor() {
return readerTermsIndexDivisor;
}
-
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java?rev=1058461&r1=1058460&r2=1058461&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Thu Jan 13 08:49:21 2011
@@ -1,66 +1,56 @@
package org.apache.lucene.index;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.document.Document;
-public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterThreadPool {
- private static final class AffinityThreadState extends ThreadState {
- int numAssignedThreads;
-
- @Override
- void finish() {
- numAssignedThreads--;
- }
- }
+public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
+ private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>();
- private Map<Thread, AffinityThreadState> threadBindings = new HashMap<Thread, AffinityThreadState>();
-
- public ThreadAffinityDocumentsWriterThreadPool(int maxNumThreadStates) {
- super(maxNumThreadStates);
+ public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
+ super(maxNumPerThreads);
}
@Override
- protected ThreadState selectThreadState(Thread requestingThread, DocumentsWriter documentsWriter, Document doc) {
- AffinityThreadState threadState = threadBindings.get(requestingThread);
- // First, find a thread state. If this thread already
- // has affinity to a specific ThreadState, use that one
- // again.
- if (threadState == null) {
- AffinityThreadState minThreadState = null;
- for(int i=0;i<allThreadStates.length;i++) {
- AffinityThreadState ts = (AffinityThreadState) allThreadStates[i];
- if (minThreadState == null || ts.numAssignedThreads < minThreadState.numAssignedThreads)
- minThreadState = ts;
+ public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc) {
+ ThreadState threadState = threadBindings.get(requestingThread);
+ if (threadState != null) {
+ if (threadState.tryLock()) {
+ return threadState;
}
- if (minThreadState != null && (minThreadState.numAssignedThreads == 0 || allThreadStates.length >= maxNumThreadStates)) {
- threadState = minThreadState;
- } else {
- threadState = addNewThreadState(documentsWriter, new AffinityThreadState());
+ }
+
+ // find the state that has minimum amount of threads waiting
+ Iterator<ThreadState> it = getActivePerThreadsIterator();
+ ThreadState minThreadState = null;
+ while (it.hasNext()) {
+ ThreadState state = it.next();
+ if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
+ minThreadState = state;
}
- threadBindings.put(requestingThread, threadState);
}
- threadState.numAssignedThreads++;
- return threadState;
+ if (minThreadState == null || minThreadState.hasQueuedThreads()) {
+ ThreadState newState = newThreadState();
+ if (newState != null) {
+ minThreadState = newState;
+ threadBindings.put(requestingThread, newState);
+ }
+ }
+
+ minThreadState.lock();
+ return minThreadState;
}
@Override
- protected void clearThreadBindings(ThreadState flushedThread) {
- Iterator<Entry<Thread, AffinityThreadState>> it = threadBindings.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Thread, AffinityThreadState> e = it.next();
- if (e.getValue() == flushedThread) {
- it.remove();
- }
- }
+ public void clearThreadBindings(ThreadState perThread) {
+ threadBindings.clear();
}
@Override
- protected void clearAllThreadBindings() {
+ public void clearAllThreadBindings() {
threadBindings.clear();
}
}