You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2011/03/30 15:30:07 UTC
svn commit: r1086947 [1/2] - in
/lucene/dev/branches/realtime_search/lucene/src:
java/org/apache/lucene/index/ test-framework/org/apache/lucene/store/
test-framework/org/apache/lucene/util/ test/org/apache/lucene/index/
Author: simonw
Date: Wed Mar 30 13:30:07 2011
New Revision: 1086947
URL: http://svn.apache.org/viewvc?rev=1086947&view=rev
Log:
LUCENE-2573: Tiered flushing of DWPTs by RAM with low/high water marks
Added:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java
lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/util/ThrottledIndexOutput.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
Modified:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Wed Mar 30 13:30:07 2011
@@ -72,13 +72,18 @@ class BufferedDeletes {
public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE);
- final AtomicLong bytesUsed = new AtomicLong();
+ final AtomicLong bytesUsed;
private final static boolean VERBOSE_DELETES = false;
long gen;
-
public BufferedDeletes(boolean sortTerms) {
+ this(sortTerms, new AtomicLong());
+ }
+
+ BufferedDeletes(boolean sortTerms, AtomicLong bytesUsed) {
+ assert bytesUsed != null;
+ this.bytesUsed = bytesUsed;
if (sortTerms) {
terms = new TreeMap<Term,Integer>();
} else {
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Wed Mar 30 13:30:07 2011
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
@@ -104,10 +103,8 @@ import org.apache.lucene.store.Directory
*/
final class DocumentsWriter {
- final AtomicLong bytesUsed = new AtomicLong(0);
Directory directory;
- boolean bufferIsFull; // True when it's time to write segment
private volatile boolean closed;
PrintStream infoStream;
@@ -118,25 +115,36 @@ final class DocumentsWriter {
final IndexWriter indexWriter;
private AtomicInteger numDocsInRAM = new AtomicInteger(0);
- private AtomicLong ramUsed = new AtomicLong(0);
final BufferedDeletesStream bufferedDeletesStream;
// TODO: cutover to BytesRefHash
- private BufferedDeletes pendingDeletes = new BufferedDeletes(false);
+ private final BufferedDeletes pendingDeletes = new BufferedDeletes(false);
final IndexingChain chain;
- private final IndexWriterConfig config;
final DocumentsWriterPerThreadPool perThreadPool;
+ final FlushPolicy flushPolicy;
+ final DocumentsWriterFlushControl flushControl;
+ final Healthiness healthiness;
DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumberBiMap globalFieldNumbers,
BufferedDeletesStream bufferedDeletesStream) throws IOException {
this.directory = directory;
this.indexWriter = writer;
- this.similarityProvider = writer.getConfig().getSimilarityProvider();
+ this.similarityProvider = config.getSimilarityProvider();
this.bufferedDeletesStream = bufferedDeletesStream;
this.perThreadPool = config.getIndexerThreadPool();
this.chain = config.getIndexingChain();
this.perThreadPool.initialize(this, globalFieldNumbers, config);
- this.config = config;
+ final FlushPolicy configuredPolicy = config.getFlushPolicy();
+ if (configuredPolicy == null) {
+ flushPolicy = new FlushByRamOrCountsPolicy();
+ } else {
+ flushPolicy = configuredPolicy;
+ }
+ flushPolicy.init(this);
+
+ healthiness = new Healthiness();
+ final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
+ flushControl = new DocumentsWriterFlushControl(flushPolicy, perThreadPool, healthiness, pendingDeletes, maxRamPerDWPT);
}
boolean deleteQueries(final Query... queries) throws IOException {
@@ -146,13 +154,15 @@ final class DocumentsWriter {
}
}
- Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+ final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
while (threadsIterator.hasNext()) {
ThreadState state = threadsIterator.next();
state.lock();
try {
- state.perThread.deleteQueries(queries);
+ if (state.isActive()) {
+ state.perThread.deleteQueries(queries);
+ }
} finally {
state.unlock();
}
@@ -178,12 +188,17 @@ final class DocumentsWriter {
ThreadState state = threadsIterator.next();
state.lock();
try {
- state.perThread.deleteTerms(terms);
+ if (state.isActive()) {
+ state.perThread.deleteTerms(terms);
+ flushControl.doOnDelete(state);
+ }
} finally {
state.unlock();
}
}
-
+ if (flushControl.flushDeletes.getAndSet(false)) {
+ flushDeletes();
+ }
return false;
}
@@ -194,7 +209,7 @@ final class DocumentsWriter {
return deleteTerms(term);
}
- void deleteTerm(final Term term, ThreadState exclude) {
+ void deleteTerm(final Term term, ThreadState exclude) throws IOException {
synchronized(this) {
pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
}
@@ -207,11 +222,21 @@ final class DocumentsWriter {
state.lock();
try {
state.perThread.deleteTerms(term);
+ flushControl.doOnDelete(state);
} finally {
state.unlock();
}
}
}
+ if (flushControl.flushDeletes.getAndSet(false)) {
+ flushDeletes();
+ }
+ }
+
+ private void flushDeletes() throws IOException {
+ maybePushPendingDeletes();
+ indexWriter.applyAllDeletes();
+ indexWriter.flushCount.incrementAndGet();
}
/** If non-null, various details of indexing are printed
@@ -221,11 +246,6 @@ final class DocumentsWriter {
pushConfigChange();
}
- synchronized void setSimilarityProvider(SimilarityProvider similarityProvider) {
- this.similarityProvider = similarityProvider;
- pushConfigChange();
- }
-
private final void pushConfigChange() {
Iterator<ThreadState> it = perThreadPool.getAllPerThreadsIterator();
while (it.hasNext()) {
@@ -245,9 +265,11 @@ final class DocumentsWriter {
return abortedFiles;
}
- void message(String message) {
+ // returns boolean for asserts
+ boolean message(String message) {
if (infoStream != null)
indexWriter.message("DW: " + message);
+ return true;
}
private void ensureOpen() throws AlreadyClosedException {
@@ -272,13 +294,18 @@ final class DocumentsWriter {
message("docWriter: abort");
}
- Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+ final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
while (threadsIterator.hasNext()) {
ThreadState perThread = threadsIterator.next();
perThread.lock();
try {
- perThread.perThread.abort();
+ if (perThread.isActive()) { // we might be closed
+ perThread.perThread.abort();
+ perThread.perThread.checkAndResetHasAborted();
+ } else {
+ assert closed;
+ }
} finally {
perThread.unlock();
}
@@ -297,26 +324,12 @@ final class DocumentsWriter {
}
public int getBufferedDeleteTermsSize() {
- int size = 0;
- Iterator<ThreadState> it = perThreadPool.getActivePerThreadsIterator();
- while (it.hasNext()) {
- DocumentsWriterPerThread dwpt = it.next().perThread;
- size += dwpt.pendingDeletes.terms.size();
- }
- size += pendingDeletes.terms.size();
- return size;
+ return pendingDeletes.terms.size();
}
//for testing
public int getNumBufferedDeleteTerms() {
- int numDeletes = 0;
- Iterator<ThreadState> it = perThreadPool.getActivePerThreadsIterator();
- while (it.hasNext()) {
- DocumentsWriterPerThread dwpt = it.next().perThread;
- numDeletes += dwpt.pendingDeletes.numTermDeletes.get();
- }
- numDeletes += pendingDeletes.numTermDeletes.get();
- return numDeletes;
+ return pendingDeletes.numTermDeletes.get();
}
public boolean anyDeletions() {
@@ -325,67 +338,89 @@ final class DocumentsWriter {
void close() {
closed = true;
+ flushControl.setClosed();
}
- boolean updateDocument(final Document doc, final Analyzer analyzer, final Term delTerm)
- throws CorruptIndexException, IOException {
+ boolean updateDocument(final Document doc, final Analyzer analyzer,
+ final Term delTerm) throws CorruptIndexException, IOException {
ensureOpen();
-
- FlushedSegment newSegment = null;
-
- ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
+ boolean maybeMerge = false;
+ final boolean isUpdate = delTerm != null;
+ if (healthiness.isStalled()) {
+ /*
+ * if we are allowed to hijack threads for flushing we try to flush out
+ * as many pending DWPT to release memory and get back healthy status.
+ */
+ if (infoStream != null) {
+ message("WARNING DocumentsWriter is stalled try to hijack thread to flush pending segment");
+ }
+ // try pick up pending threads here if possile
+ final DocumentsWriterPerThread flushingDWPT;
+ flushingDWPT = flushControl.getFlushIfPending(null);
+ // don't push the delete here since the update could fail!
+ maybeMerge = doFlush(flushingDWPT);
+ if (infoStream != null && healthiness.isStalled()) {
+ message("WARNING DocumentsWriter is stalled might block thread until DocumentsWriter is not stalled anymore");
+ }
+ healthiness.waitIfStalled(); // block if stalled
+ }
+ ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
+ this, doc);
+ DocumentsWriterPerThread flushingDWPT = null;
try {
- DocumentsWriterPerThread dwpt = perThread.perThread;
- long perThreadRAMUsedBeforeAdd = dwpt.bytesUsed();
- dwpt.updateDocument(doc, analyzer, delTerm);
+ if (!perThread.isActive()) {
+ ensureOpen();
+ assert false: "perThread is not active but we are still open";
+ }
+ final DocumentsWriterPerThread dwpt = perThread.perThread;
+ try {
+ dwpt.updateDocument(doc, analyzer, delTerm);
+ } finally {
+ if(dwpt.checkAndResetHasAborted()) {
+ flushControl.doOnAbort(perThread);
+ }
+ }
+ flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
numDocsInRAM.incrementAndGet();
-
- newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
} finally {
perThread.unlock();
}
-
// delete term from other DWPTs later, so that this thread
// doesn't have to lock multiple DWPTs at the same time
- if (delTerm != null) {
+ if (isUpdate) {
deleteTerm(delTerm, perThread);
}
+ maybeMerge |= doFlush(flushingDWPT);
+ return maybeMerge;
+ }
+
+
- if (newSegment != null) {
- finishFlushedSegment(newSegment);
- }
-
- if (newSegment != null) {
- perThreadPool.clearThreadBindings(perThread);
- return true;
- }
-
- return false;
+ private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
+ boolean maybeMerge = false;
+ while (flushingDWPT != null) {
+ maybeMerge = true;
+ try {
+ // flush concurrently without locking
+ final FlushedSegment newSegment = flushingDWPT.flush();
+ finishFlushedSegment(newSegment);
+ } finally {
+ flushControl.doAfterFlush(flushingDWPT);
+ flushingDWPT.checkAndResetHasAborted();
+ indexWriter.flushCount.incrementAndGet();
+ }
+ flushingDWPT = flushControl.nextPendingFlush() ;
}
+ return maybeMerge;
+ }
+
- private void finishFlushedSegment(FlushedSegment newSegment) throws IOException {
+ private void finishFlushedSegment(FlushedSegment newSegment)
+ throws IOException {
pushDeletes(newSegment);
if (newSegment != null) {
indexWriter.addFlushedSegment(newSegment);
- }
- }
-
- private final FlushedSegment finishAddDocument(DocumentsWriterPerThread perThread,
- long perThreadRAMUsedBeforeAdd) throws IOException {
- FlushedSegment newSegment = null;
- final int maxBufferedDocs = config.getMaxBufferedDocs();
- if (maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- perThread.getNumDocsInRAM() >= maxBufferedDocs) {
- newSegment = perThread.flush();
- }
-
- long deltaRAM = perThread.bytesUsed() - perThreadRAMUsedBeforeAdd;
- long oldValue = ramUsed.get();
- while (!ramUsed.compareAndSet(oldValue, oldValue + deltaRAM)) {
- oldValue = ramUsed.get();
}
-
- return newSegment;
}
final void subtractFlushedNumDocs(int numFlushed) {
@@ -402,66 +437,79 @@ final class DocumentsWriter {
final long delGen = bufferedDeletesStream.getNextGen();
// Lock order: DW -> BD
if (deletes != null && deletes.any()) {
- final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(deletes, delGen);
- if (infoStream != null) {
- message("flush: push buffered deletes");
- }
- bufferedDeletesStream.push(packet);
- if (infoStream != null) {
- message("flush: delGen=" + packet.gen);
- }
- }
- flushedSegment.segmentInfo.setBufferedDeletesGen(delGen);
- }
+ final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(deletes,
+ delGen);
+ if (infoStream != null) {
+ message("flush: push buffered deletes");
+ }
+ bufferedDeletesStream.push(packet);
+ if (infoStream != null) {
+ message("flush: delGen=" + packet.gen);
}
+ }
+ flushedSegment.segmentInfo.setBufferedDeletesGen(delGen);
+ }
+ }
private synchronized final void maybePushPendingDeletes() {
final long delGen = bufferedDeletesStream.getNextGen();
if (pendingDeletes.any()) {
- bufferedDeletesStream.push(new FrozenBufferedDeletes(pendingDeletes, delGen));
+ indexWriter.bufferedDeletesStream.push(new FrozenBufferedDeletes(
+ pendingDeletes, delGen));
pendingDeletes.clear();
- }
}
+ }
final boolean flushAllThreads(final boolean flushDeletes)
throws IOException {
- Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+ final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
boolean anythingFlushed = false;
while (threadsIterator.hasNext()) {
- FlushedSegment newSegment = null;
-
- ThreadState perThread = threadsIterator.next();
- perThread.lock();
+ final ThreadState perThread = threadsIterator.next();
+ final DocumentsWriterPerThread flushingDWPT;
+ /*
+ * TODO: maybe we can leverage incoming / indexing threads here if we mark
+ * all active threads pending so that we don't need to block until we got
+ * the handle. Yet, we need to figure out how to identify that a certain
+ * DWPT has been flushed since they are simply replaced once checked out
+ * for flushing. This would give us another level of concurrency during
+ * commit.
+ *
+ * Maybe we simply iterate them and store the ThreadStates and mark
+ * all as flushPending and at the same time record the DWPT instance as a
+ * key for the pending ThreadState. This way we can easily iterate until
+ * all DWPT have changed.
+ */
+ perThread.lock();
try {
-
- DocumentsWriterPerThread dwpt = perThread.perThread;
- final int numDocs = dwpt.getNumDocsInRAM();
-
+ if (!perThread.isActive()) {
+ assert closed;
+ continue; //this perThread is already done maybe by a concurrently indexing thread
+ }
+ final DocumentsWriterPerThread dwpt = perThread.perThread;
// Always flush docs if there are any
- boolean flushDocs = numDocs > 0;
-
- String segment = dwpt.getSegment();
-
+ final boolean flushDocs = dwpt.getNumDocsInRAM() > 0;
+ final String segment = dwpt.getSegment();
// If we are flushing docs, segment must not be null:
assert segment != null || !flushDocs;
-
if (flushDocs) {
- newSegment = dwpt.flush();
-
- if (newSegment != null) {
- perThreadPool.clearThreadBindings(perThread);
- }
+ // check out and set pending if not already set
+ flushingDWPT = flushControl.tryCheckoutForFlush(perThread, true);
+ assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
+ assert dwpt == flushingDWPT : "flushControl returned different DWPT";
+ try {
+ final FlushedSegment newSegment = dwpt.flush();
+ anythingFlushed = true;
+ finishFlushedSegment(newSegment);
+ } finally {
+ flushControl.doAfterFlush(flushingDWPT);
}
+ }
} finally {
perThread.unlock();
}
-
- if (newSegment != null) {
- anythingFlushed = true;
- finishFlushedSegment(newSegment);
- }
}
if (!anythingFlushed && flushDeletes) {
@@ -471,6 +519,10 @@ final class DocumentsWriter {
return anythingFlushed;
}
+
+
+
+
// /* We have three pools of RAM: Postings, byte blocks
// * (holds freq/prox posting data) and per-doc buffers
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1086947&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Wed Mar 30 13:30:07 2011
@@ -0,0 +1,263 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+
+/**
+ * This class controls {@link DocumentsWriterPerThread} flushing during
+ * indexing. It tracks the memory consumption per
+ * {@link DocumentsWriterPerThread} and uses a configured {@link FlushPolicy} to
+ * decide if a {@link DocumentsWriterPerThread} must flush.
+ * <p>
+ * In addition to the {@link FlushPolicy} the flush control might set certain
+ * {@link DocumentsWriterPerThread} as flush pending iff a
+ * {@link DocumentsWriterPerThread} exceeds the
+ * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address
+ * space exhaustion.
+ */
+public final class DocumentsWriterFlushControl {
+
+ private final long maxBytesPerDWPT;
+ private long activeBytes = 0;
+ private long flushBytes = 0;
+ private volatile int numPending = 0;
+ private volatile int numFlushing = 0;
+ final AtomicBoolean flushDeletes = new AtomicBoolean(false);
+
+ long peakActiveBytes = 0;// only with assert
+ long peakFlushBytes = 0;// only with assert
+ long peakNetBytes = 0;// only with assert
+ private final Healthiness healthiness;
+ private final DocumentsWriterPerThreadPool perThreadPool;
+ private final FlushPolicy flushPolicy;
+ private boolean closed = false;
+ private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>();
+ private final BufferedDeletes pendingDeletes;
+
+ DocumentsWriterFlushControl(FlushPolicy flushPolicy,
+ DocumentsWriterPerThreadPool threadPool, Healthiness healthiness,
+ BufferedDeletes pendingDeletes, long maxBytesPerDWPT) {
+ this.healthiness = healthiness;
+ this.perThreadPool = threadPool;
+ this.flushPolicy = flushPolicy;
+ this.maxBytesPerDWPT = maxBytesPerDWPT;
+ this.pendingDeletes = pendingDeletes;
+ }
+
+ public synchronized long activeBytes() {
+ return activeBytes;
+ }
+
+ public synchronized long flushBytes() {
+ return flushBytes;
+ }
+
+ public synchronized long netBytes() {
+ return flushBytes + activeBytes;
+ }
+
+ private void commitPerThreadBytes(ThreadState perThread) {
+ final long delta = perThread.perThread.bytesUsed()
+ - perThread.perThreadBytes;
+ perThread.perThreadBytes += delta;
+ /*
+ * We need to differentiate here if we are pending since setFlushPending
+ * moves the perThread memory to the flushBytes and we could be set to
+ * pending during a delete
+ */
+ if (perThread.flushPending) {
+ flushBytes += delta;
+ } else {
+ activeBytes += delta;
+ }
+ assert updatePeaks(delta);
+ }
+
+ private boolean updatePeaks(long delta) {
+ peakActiveBytes = Math.max(peakActiveBytes, activeBytes);
+ peakFlushBytes = Math.max(peakFlushBytes, flushBytes);
+ peakNetBytes = Math.max(peakNetBytes, netBytes());
+ return true;
+ }
+
+ synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread,
+ boolean isUpdate) {
+ commitPerThreadBytes(perThread);
+ if (!perThread.flushPending) {
+ if (isUpdate) {
+ flushPolicy.onUpdate(this, perThread);
+ } else {
+ flushPolicy.onInsert(this, perThread);
+ }
+ if (!perThread.flushPending && perThread.perThreadBytes > maxBytesPerDWPT) {
+ // safety check to prevent a single DWPT exceeding its RAM limit. This
+ // is super
+ // important since we can not address more than 2048 MB per DWPT
+ setFlushPending(perThread);
+ }
+ }
+ final DocumentsWriterPerThread flushingDWPT = getFlushIfPending(perThread);
+ healthiness.updateStalled(this);
+ return flushingDWPT;
+ }
+
+ synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
+ assert flushingWriters.containsKey(dwpt);
+ numFlushing--;
+ Long bytes = flushingWriters.remove(dwpt);
+ flushBytes -= bytes.longValue();
+ perThreadPool.recycle(dwpt);
+ healthiness.updateStalled(this);
+ }
+
+ /**
+ * Sets flush pending state on the given {@link ThreadState}. The
+ * {@link ThreadState} must have indexed at least on Document and must not be
+ * already pending.
+ */
+ public synchronized void setFlushPending(ThreadState perThread) {
+ assert !perThread.flushPending;
+ assert perThread.perThread.getNumDocsInRAM() > 0;
+ perThread.flushPending = true; // write access synced
+ final long bytes = perThread.perThreadBytes;
+ flushBytes += bytes;
+ activeBytes -= bytes;
+ numPending++; // write access synced
+ }
+
+ synchronized void doOnAbort(ThreadState state) {
+ if (state.flushPending) {
+ flushBytes -= state.perThreadBytes;
+ } else {
+ activeBytes -= state.perThreadBytes;
+ }
+ // take it out of the loop this DWPT is stale
+ perThreadPool.replaceForFlush(state, closed);
+ healthiness.updateStalled(this);
+ }
+
+ synchronized DocumentsWriterPerThread tryCheckoutForFlush(
+ ThreadState perThread, boolean setPending) {
+ if (setPending && !perThread.flushPending) {
+ setFlushPending(perThread);
+ }
+ if (perThread.flushPending) {
+ // we are pending so all memory is already moved to flushBytes
+ if (perThread.tryLock()) {
+ try {
+ if (perThread.isActive()) {
+ assert perThread.isHeldByCurrentThread();
+ final DocumentsWriterPerThread dwpt;
+ final long bytes = perThread.perThreadBytes; // do that before
+ // replace!
+ dwpt = perThreadPool.replaceForFlush(perThread, closed);
+ assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
+ // record the flushing DWPT to reduce flushBytes in doAfterFlush
+ flushingWriters.put(dwpt, Long.valueOf(bytes));
+ numPending--; // write access synced
+ numFlushing++;
+ return dwpt;
+ }
+ } finally {
+ perThread.unlock();
+ }
+ }
+ }
+ return null;
+ }
+
+ DocumentsWriterPerThread getFlushIfPending(ThreadState perThread) {
+ if (numPending > 0) {
+ final DocumentsWriterPerThread dwpt = perThread == null ? null
+ : tryCheckoutForFlush(perThread, false);
+ if (dwpt == null) {
+ return nextPendingFlush();
+ }
+ return dwpt;
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "DocumentsWriterFlushControl [activeBytes=" + activeBytes
+ + ", flushBytes=" + flushBytes + "]";
+ }
+
+ DocumentsWriterPerThread nextPendingFlush() {
+ if (numPending > 0) {
+ final Iterator<ThreadState> allActiveThreads = perThreadPool
+ .getActivePerThreadsIterator();
+ while (allActiveThreads.hasNext() && numPending > 0) {
+ ThreadState next = allActiveThreads.next();
+ if (next.flushPending) {
+ DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next, false);
+ if (dwpt != null) {
+ return dwpt;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ synchronized void setClosed() {
+ // set by DW to signal that we should not release new DWPT after close
+ this.closed = true;
+ }
+
+ /**
+ * Returns an iterator that provides access to all currently active {@link ThreadState}s
+ */
+ public Iterator<ThreadState> allActiveThreads() {
+ return perThreadPool.getActivePerThreadsIterator();
+ }
+
+ long maxNetBytes() {
+ return flushPolicy.getMaxNetBytes();
+ }
+
+ synchronized void doOnDelete(ThreadState state) {
+ if (!state.flushPending) {
+ flushPolicy.onDelete(this, state);
+ }
+ }
+
+ /**
+ * Returns the number of delete terms in the global pool
+ */
+ public int getNumGlobalTermDeletes() {
+ return pendingDeletes.numTermDeletes.get();
+ }
+
+ int numFlushingDWPT() {
+ return numFlushing;
+ }
+
+ public void setFlushDeletes() {
+ flushDeletes.set(true);
+ }
+
+ int numActiveDWPT() {
+ return this.perThreadPool.getMaxThreadStates();
+ }
+}
\ No newline at end of file
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Wed Mar 30 13:30:07 2011
@@ -31,7 +31,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.SimilarityProvider;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BitVector;
-import org.apache.lucene.util.ByteBlockPool.DirectAllocator;
+import org.apache.lucene.util.ByteBlockPool.Allocator;
import org.apache.lucene.util.RamUsageEstimator;
public class DocumentsWriterPerThread {
@@ -73,17 +73,14 @@ public class DocumentsWriterPerThread {
final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriterPerThread);
final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
- final InvertedDocConsumer termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter,
- new TermsHash(documentsWriterPerThread, termVectorsWriter, null));
+ final InvertedDocConsumer termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter, true,
+ new TermsHash(documentsWriterPerThread, termVectorsWriter, false, null));
final NormsWriter normsWriter = new NormsWriter();
final DocInverter docInverter = new DocInverter(documentsWriterPerThread.docState, termsHash, normsWriter);
return new DocFieldProcessor(documentsWriterPerThread, docInverter);
}
};
- // Deletes for our still-in-RAM (to be flushed next) segment
- BufferedDeletes pendingDeletes = new BufferedDeletes(false);
-
static class DocState {
final DocumentsWriterPerThread docWriter;
Analyzer analyzer;
@@ -128,7 +125,7 @@ public class DocumentsWriterPerThread {
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
void abort() throws IOException {
- aborting = true;
+ hasAborted = aborting = true;
try {
if (infoStream != null) {
message("docWriter: now abort");
@@ -152,38 +149,53 @@ public class DocumentsWriterPerThread {
final DocumentsWriter parent;
final IndexWriter writer;
-
final Directory directory;
final DocState docState;
final DocConsumer consumer;
+ final AtomicLong bytesUsed;
+
+ SegmentWriteState flushState;
+ //Deletes for our still-in-RAM (to be flushed next) segment
+ BufferedDeletes pendingDeletes;
+ String segment; // Current segment we are working on
+ boolean aborting = false; // True if an abort is pending
+ boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting
- String segment; // Current segment we are working on
- boolean aborting; // True if an abort is pending
-
+ private FieldInfos fieldInfos;
private final PrintStream infoStream;
private int numDocsInRAM;
private int flushedDocCount;
- SegmentWriteState flushState;
-
- final AtomicLong bytesUsed = new AtomicLong(0);
-
- private FieldInfos fieldInfos;
-
- public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, FieldInfos fieldInfos, IndexingChain indexingChain) {
+
+ public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
+ FieldInfos fieldInfos, IndexingChain indexingChain) {
this.directory = directory;
this.parent = parent;
this.fieldInfos = fieldInfos;
this.writer = parent.indexWriter;
this.infoStream = parent.indexWriter.getInfoStream();
this.docState = new DocState(this);
- this.docState.similarityProvider = parent.indexWriter.getConfig().getSimilarityProvider();
+ this.docState.similarityProvider = parent.indexWriter.getConfig()
+ .getSimilarityProvider();
consumer = indexingChain.getChain(this);
- }
+ bytesUsed = new AtomicLong(0);
+ pendingDeletes = new BufferedDeletes(false);
+ }
+
+ public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos fieldInfos) {
+ this(other.directory, other.parent, fieldInfos, other.parent.chain);
+
+ }
void setAborting() {
aborting = true;
}
+
+ boolean checkAndResetHasAborted() {
+ final boolean retval = hasAborted;
+ hasAborted = false;
+ return retval;
+ }
public void updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws IOException {
assert writer.testPoint("DocumentsWriterPerThread addDocument start");
@@ -203,7 +215,7 @@ public class DocumentsWriterPerThread {
boolean success = false;
try {
try {
- consumer.processDocument(fieldInfos);
+ consumer.processDocument(fieldInfos);
} finally {
docState.clear();
}
@@ -251,21 +263,33 @@ public class DocumentsWriterPerThread {
void deleteQueries(Query... queries) {
if (numDocsInRAM > 0) {
- for (Query query : queries) {
- pendingDeletes.addQuery(query, numDocsInRAM);
+ for (Query query : queries) {
+ pendingDeletes.addQuery(query, numDocsInRAM);
+ }
}
}
- }
void deleteTerms(Term... terms) {
if (numDocsInRAM > 0) {
- for (Term term : terms) {
- pendingDeletes.addTerm(term, numDocsInRAM);
+ for (Term term : terms) {
+ pendingDeletes.addTerm(term, numDocsInRAM);
+ }
}
}
+
+ /**
+ * Returns the number of delete terms in this {@link DocumentsWriterPerThread}
+ */
+ public int numDeleteTerms() {
+ // public for FlushPolicy
+ return pendingDeletes.numTermDeletes.get();
}
- int getNumDocsInRAM() {
+ /**
+ * Returns the number of RAM resident documents in this {@link DocumentsWriterPerThread}
+ */
+ public int getNumDocsInRAM() {
+ // public for FlushPolicy
return numDocsInRAM;
}
@@ -285,7 +309,6 @@ public class DocumentsWriterPerThread {
/** Flush all pending docs to a new segment */
FlushedSegment flush() throws IOException {
assert numDocsInRAM > 0;
-
flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
numDocsInRAM, writer.getConfig().getTermIndexInterval(),
fieldInfos.buildSegmentCodecs(true), pendingDeletes);
@@ -323,16 +346,17 @@ public class DocumentsWriterPerThread {
newSegment.setHasVectors(flushState.hasVectors);
if (infoStream != null) {
- message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
+ message("new segment has " + (flushState.deletedDocs == null ? 0 : flushState.deletedDocs.count()) + " deleted docs");
message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
message("flushedFiles=" + newSegment.files());
message("flushed codecs=" + newSegment.getSegmentCodecs());
}
flushedDocCount += flushState.numDocs;
- BufferedDeletes segmentDeletes = null;
+ final BufferedDeletes segmentDeletes;
if (pendingDeletes.queries.isEmpty()) {
pendingDeletes.clear();
+ segmentDeletes = null;
} else {
segmentDeletes = pendingDeletes;
pendingDeletes = new BufferedDeletes(false);
@@ -350,7 +374,6 @@ public class DocumentsWriterPerThread {
parent.indexWriter.deleter.refresh(segment);
}
}
-
abort();
}
}
@@ -362,7 +385,7 @@ public class DocumentsWriterPerThread {
}
long bytesUsed() {
- return bytesUsed.get();
+ return bytesUsed.get() + pendingDeletes.bytesUsed.get();
}
FieldInfos getFieldInfos() {
@@ -395,11 +418,38 @@ public class DocumentsWriterPerThread {
bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
return b;
}
+
+ void recycleIntBlocks(int[][] blocks, int offset, int length) {
+ bytesUsed.addAndGet(-(length *(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT)));
+ }
- final DirectAllocator byteBlockAllocator = new DirectAllocator();
+ final Allocator byteBlockAllocator = new DirectTrackingAllocator();
+
+
+ private class DirectTrackingAllocator extends Allocator {
+ public DirectTrackingAllocator() {
+ this(BYTE_BLOCK_SIZE);
+ }
+
+ public DirectTrackingAllocator(int blockSize) {
+ super(blockSize);
+ }
+
+ public byte[] getByteBlock() {
+ bytesUsed.addAndGet(blockSize);
+ return new byte[blockSize];
+ }
+ @Override
+ public void recycleByteBlocks(byte[][] blocks, int start, int end) {
+ bytesUsed.addAndGet(-((end-start)* blockSize));
+ for (int i = start; i < end; i++) {
+ blocks[i] = null;
+ }
+ }
+
+ };
String toMB(long v) {
return nf.format(v/1024./1024.);
}
-
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Wed Mar 30 13:30:07 2011
@@ -9,16 +9,92 @@ import org.apache.lucene.index.SegmentCo
import org.apache.lucene.index.codecs.CodecProvider;
public abstract class DocumentsWriterPerThreadPool {
- final static class ThreadState extends ReentrantLock {
- final DocumentsWriterPerThread perThread;
+
+ /**
+ * {@link ThreadState} references and guards a
+ * {@link DocumentsWriterPerThread} instance that is used during indexing to
+ * build a in-memory index segment. {@link ThreadState} also holds all flush
+ * related per-thread data controlled by {@link DocumentsWriterFlushControl}.
+ * <p>
+ * A {@link ThreadState}, its methods and members should only accessed by one
+ * thread a time. Users must acquire the lock via {@link ThreadState#lock()}
+ * and release the lock in a finally block via {@link ThreadState#unlock()}
+ * before accessing the state.
+ */
+ @SuppressWarnings("serial")
+ public final static class ThreadState extends ReentrantLock {
+ // public for FlushPolicy
+ DocumentsWriterPerThread perThread;
+ // write access guarded by DocumentsWriterFlushControl
+ volatile boolean flushPending = false;
+ // write access guarded by DocumentsWriterFlushControl
+ long perThreadBytes = 0;
+
+ // guarded by Reentrant lock
+ private boolean isActive = true;
ThreadState(DocumentsWriterPerThread perThread) {
this.perThread = perThread;
}
+
+ /**
+ * Resets the internal {@link DocumentsWriterPerThread} with the given one.
+ * if the given DWPT is <code>null</code> this ThreadState is marked as inactive and should not be used
+ * for indexing anymore.
+ * @see #isActive()
+ */
+ void resetWriter(DocumentsWriterPerThread perThread) {
+ assert this.isHeldByCurrentThread();
+ if (perThread == null) {
+ isActive = false;
+ }
+ this.perThread = perThread;
+ this.perThreadBytes = 0;
+ this.flushPending = false;
+ }
+
+ /**
+ * Returns <code>true</code> if this ThreadState is still open. This will
+ * only return <code>false</code> iff the DW has been closed and this
+ * ThreadState is already checked out for flush.
+ */
+ boolean isActive() {
+ assert this.isHeldByCurrentThread();
+ return isActive;
+ }
+
+ /**
+ * Returns the number of currently active bytes in this ThreadState's
+ * {@link DocumentsWriterPerThread}
+ */
+ public long getBytesUsedPerThread() {
+ assert this.isHeldByCurrentThread();
+ // public for FlushPolicy
+ return perThreadBytes;
+ }
+
+ /**
+ * Returns this {@link ThreadState}s {@link DocumentsWriterPerThread}
+ */
+ public DocumentsWriterPerThread getDocumentsWriterPerThread() {
+ assert this.isHeldByCurrentThread();
+ // public for FlushPolicy
+ return perThread;
+ }
+
+ /**
+ * Returns <code>true</code> iff this {@link ThreadState} is marked as flush
+ * pending otherwise <code>false</code>
+ */
+ public boolean isFlushPending() {
+ return flushPending;
+ }
}
private final ThreadState[] perThreads;
private volatile int numThreadStatesActive;
+ private CodecProvider codecProvider;
+ private FieldNumberBiMap globalFieldMap;
public DocumentsWriterPerThreadPool(int maxNumPerThreads) {
maxNumPerThreads = (maxNumPerThreads < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumPerThreads;
@@ -28,7 +104,8 @@ public abstract class DocumentsWriterPer
}
public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) {
- final CodecProvider codecProvider = config.getCodecProvider();
+ codecProvider = config.getCodecProvider();
+ this.globalFieldMap = globalFieldMap;
for (int i = 0; i < perThreads.length; i++) {
final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider));
perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
@@ -41,26 +118,53 @@ public abstract class DocumentsWriterPer
public synchronized ThreadState newThreadState() {
if (numThreadStatesActive < perThreads.length) {
- ThreadState state = perThreads[numThreadStatesActive];
- numThreadStatesActive++;
- return state;
+ return perThreads[numThreadStatesActive++];
}
-
return null;
}
-
+
+ protected DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
+ assert threadState.isHeldByCurrentThread();
+ final DocumentsWriterPerThread dwpt = threadState.perThread;
+ if (!closed) {
+ final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider));
+ threadState.resetWriter(new DocumentsWriterPerThread(dwpt, infos));
+ } else {
+ threadState.resetWriter(null);
+ }
+ clearThreadBindings(threadState); // TODO - do we need to clear ThreadBindings here since we swap DWPT this is not necessary
+ return dwpt;
+ }
+
+ public void recycle(DocumentsWriterPerThread dwpt) {
+ // don't recycle DWPT by default
+ }
+
public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
public abstract void clearThreadBindings(ThreadState perThread);
public abstract void clearAllThreadBindings();
+ /**
+ * Returns an iterator providing access to all {@link ThreadState}
+ * instances.
+ */
public Iterator<ThreadState> getAllPerThreadsIterator() {
return getPerThreadsIterator(this.perThreads.length);
}
+ /**
+ * Returns an iterator providing access to all active {@link ThreadState}
+ * instances.
+ * <p>
+ * Note: The returned iterator will only iterator
+ * {@link ThreadState}s that are active at the point in time when this method
+ * has been called.
+ *
+ */
public Iterator<ThreadState> getActivePerThreadsIterator() {
- return getPerThreadsIterator(this.numThreadStatesActive);
+ return getPerThreadsIterator(numThreadStatesActive);
}
private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
@@ -80,4 +184,21 @@ public abstract class DocumentsWriterPer
}
};
}
+
+ /**
+ * Returns the ThreadState with the minimum estimated number of threads
+ * waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
+ * is yet visible to the calling thread.
+ */
+ protected ThreadState minContendedThreadState() {
+ ThreadState minThreadState = null;
+ final Iterator<ThreadState> it = getActivePerThreadsIterator();
+ while (it.hasNext()) {
+ final ThreadState state = it.next();
+ if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
+ minThreadState = state;
+ }
+ }
+ return minThreadState;
+ }
}
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java?rev=1086947&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java Wed Mar 30 13:30:07 2011
@@ -0,0 +1,66 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+
+/**
+ * Default {@link FlushPolicy} implementation that flushes based on RAM
+ * Consumption, document count and number of buffered deletes depending on the
+ * IndexWriters {@link IndexWriterConfig}. This {@link FlushPolicy} will only
+ * respect settings which are not disabled during initialization (
+ * {@link #init(DocumentsWriter)}). All enabled {@link IndexWriterConfig}
+ * settings are used to mark {@link DocumentsWriterPerThread} as flush pending
+ * during indexing with respect to thier live updates.
+ * <p>
+ * If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled always the
+ * largest ram consuming {@link DocumentsWriterPerThread} will be marked as
+ * pending iff the global active RAM consumption is equals or higher the
+ * configured max RAM buffer.
+ */
+public class FlushByRamOrCountsPolicy extends FlushPolicy {
+
+ @Override
+ public void onDelete(DocumentsWriterFlushControl control, ThreadState state) {
+ if (flushOnDeleteTerms()) {
+ final int maxBufferedDeleteTerms = indexWriterConfig
+ .getMaxBufferedDeleteTerms();
+ if (control.getNumGlobalTermDeletes() >= maxBufferedDeleteTerms) {
+ control.setFlushDeletes();
+ }
+ }
+ }
+
+ @Override
+ public void onInsert(DocumentsWriterFlushControl control, ThreadState state) {
+ if (flushOnDocCount()
+ && state.perThread.getNumDocsInRAM() >= indexWriterConfig
+ .getMaxBufferedDocs()) {
+ control.setFlushPending(state); // flush by num docs
+ } else {// flush by RAM
+ if (flushOnRAM()) {
+ final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
+ final long totalRam = control.activeBytes();
+ final long limit = (long) (ramBufferSizeMB * 1024.d * 1024.d);
+ if (totalRam >= limit) {
+ markLargestWriterPending(control, state, totalRam);
+ }
+ }
+ }
+ }
+}
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java?rev=1086947&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java Wed Mar 30 13:30:07 2011
@@ -0,0 +1,191 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Iterator;
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.SetOnce;
+
+/**
+ * {@link FlushPolicy} controls when segments are flushed from a RAM resident
+ * internal data-structure to the {@link IndexWriter}s {@link Directory}.
+ * <p>
+ * Segments are traditionally flushed by:
+ * <ul>
+ * <li>RAM consumption - configured via
+ * {@link IndexWriterConfig#setRAMBufferSizeMB(double)}</li>
+ * <li>Number of RAM resident documents - configured via
+ * {@link IndexWriterConfig#setMaxBufferedDocs(int)}</li>
+ * <li>Number of buffered delete terms - configured via
+ * {@link IndexWriterConfig#setMaxBufferedDeleteTerms(int)}</li>
+ * </ul>
+ *
+ * The {@link IndexWriter} uses a provided {@link FlushPolicy} to control the
+ * flushing process during indexing. The policy is informed for each added or
+ * updated document as well as for each delete term. Based on the
+ * {@link FlushPolicy} the information provided via {@link ThreadState} and
+ * {@link DocumentsWriterFlushControl} the {@link FlushPolicy} can decide if a
+ * {@link DocumentsWriterPerThread} needs flushing and can mark it as
+ * flush-pending via
+ * {@link DocumentsWriterFlushControl#setFlushPending(ThreadState)}.
+ *
+ * @see ThreadState
+ * @see DocumentsWriterFlushControl
+ * @see DocumentsWriterPerThread
+ * @see IndexWriterConfig#setFlushPolicy(FlushPolicy)
+ */
+public abstract class FlushPolicy {
+ protected final SetOnce<DocumentsWriter> writer = new SetOnce<DocumentsWriter>();
+ protected IndexWriterConfig indexWriterConfig;
+
+ /**
+ * Called for each delete term applied to the given {@link ThreadState}s
+ * {@link DocumentsWriterPerThread}.
+ * <p>
+ * Note: This method is synchronized by the given
+ * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
+ * thread holds the lock on the given {@link ThreadState}
+ */
+ public abstract void onDelete(DocumentsWriterFlushControl control,
+ ThreadState state);
+
+ /**
+ * Called for each document update on the given {@link ThreadState}s
+ * {@link DocumentsWriterPerThread}.
+ * <p>
+ * Note: This method is synchronized by the given
+ * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
+ * thread holds the lock on the given {@link ThreadState}
+ */
+ public void onUpdate(DocumentsWriterFlushControl control, ThreadState state) {
+ onInsert(control, state);
+ if (!state.flushPending) {
+ onDelete(control, state);
+ }
+ }
+
+ /**
+ * Called for each document addition on the given {@link ThreadState}s
+ * {@link DocumentsWriterPerThread}.
+ * <p>
+ * Note: This method is synchronized by the given
+ * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
+ * thread holds the lock on the given {@link ThreadState}
+ */
+ public abstract void onInsert(DocumentsWriterFlushControl control,
+ ThreadState state);
+
+ /**
+ * Called by {@link DocumentsWriter} to initialize the FlushPolicy
+ */
+ protected synchronized void init(DocumentsWriter docsWriter) {
+ writer.set(docsWriter);
+ indexWriterConfig = docsWriter.indexWriter.getConfig();
+ }
+
+ /**
+ * Marks the most ram consuming active {@link DocumentsWriterPerThread} flush
+ * pending
+ */
+ protected void markLargestWriterPending(DocumentsWriterFlushControl control,
+ ThreadState perThreadState, final long currentBytesPerThread) {
+ control
+ .setFlushPending(findLargestNonPendingWriter(control, perThreadState));
+ }
+
+ /**
+ * Returns the current most RAM consuming non-pending {@link ThreadState} with
+ * at least one indexed document.
+ * <p>
+ * This method will never return <code>null</code>
+ */
+ protected ThreadState findLargestNonPendingWriter(
+ DocumentsWriterFlushControl control, ThreadState perThreadState) {
+ long maxRamSoFar = perThreadState.perThreadBytes;
+ // the dwpt which needs to be flushed eventually
+ ThreadState maxRamUsingThreadState = perThreadState;
+ assert !perThreadState.flushPending : "DWPT should have flushed";
+ Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreads();
+ while (activePerThreadsIterator.hasNext()) {
+ ThreadState next = activePerThreadsIterator.next();
+ if (!next.flushPending) {
+ final long nextRam = next.perThreadBytes;
+ if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) {
+ maxRamSoFar = nextRam;
+ maxRamUsingThreadState = next;
+ }
+ }
+ }
+ assert maxRamUsingThreadState.perThread.getNumDocsInRAM() > 0;
+ assert writer.get().message(
+ "set largest ram consuming thread pending on lower watermark");
+ return maxRamUsingThreadState;
+ }
+
+ /**
+ * Returns the max net memory which marks the upper watermark for the
+ * DocumentsWriter to be healthy. If all flushing and active
+ * {@link DocumentsWriterPerThread} consume more memory than the upper
+ * watermark all incoming threads should be stalled and blocked until the
+ * memory drops below this.
+ * <p>
+ * Note: the upper watermark is only taken into account if this
+ * {@link FlushPolicy} flushes by ram usage.
+ *
+ * <p>
+ * The default for the max net memory is set to 2 x
+ * {@link IndexWriterConfig#getRAMBufferSizeMB()}
+ *
+ */
+ public long getMaxNetBytes() {
+ if (!flushOnRAM()) {
+ return -1;
+ }
+ final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
+ return (long) (ramBufferSizeMB * 1024.d * 1024.d * 2);
+ }
+
+ /**
+ * Returns <code>true</code> if this {@link FlushPolicy} flushes on
+ * {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise
+ * <code>false</code>.
+ */
+ protected boolean flushOnDocCount() {
+ return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+ }
+
+ /**
+ * Returns <code>true</code> if this {@link FlushPolicy} flushes on
+ * {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise
+ * <code>false</code>.
+ */
+ protected boolean flushOnDeleteTerms() {
+ return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+ }
+
+ /**
+ * Returns <code>true</code> if this {@link FlushPolicy} flushes on
+ * {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise
+ * <code>false</code>.
+ */
+ protected boolean flushOnRAM() {
+ return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+ }
+
+}
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java?rev=1086947&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java Wed Mar 30 13:30:07 2011
@@ -0,0 +1,120 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+
+/**
+ * Controls the health status of a {@link DocumentsWriter} sessions. This class
+ * used to block incoming indexing threads if flushing significantly slower than
+ * indexing to ensure the {@link DocumentsWriter}s healthiness. If flushing is
+ * significantly slower than indexing the net memory used within an
+ * {@link IndexWriter} session can increase very quickly and easily exceed the
+ * JVM's available memory.
+ * <p>
+ * To prevent OOM Errors and ensure IndexWriter's stability this class blocks
+ * incoming threads from indexing once 2 x number of available
+ * {@link ThreadState}s in {@link DocumentsWriterPerThreadPool} is exceeded.
+ * Once flushing catches up and the number of flushing DWPT is equal or lower
+ * than the number of active {@link ThreadState}s threads are released and can
+ * continue indexing.
+ */
+final class Healthiness {
+
+ @SuppressWarnings("serial")
+ private static final class Sync extends AbstractQueuedSynchronizer {
+ volatile boolean hasBlockedThreads = false; // only with assert
+
+ Sync() {
+ setState(0);
+ }
+
+ boolean isHealthy() {
+ return getState() == 0;
+ }
+
+ boolean trySetStalled() {
+ int state = getState();
+ return compareAndSetState(state, state + 1);
+ }
+
+ boolean tryReset() {
+ final int oldState = getState();
+ if (oldState == 0)
+ return true;
+ if (compareAndSetState(oldState, 0)) {
+ releaseShared(0);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int tryAcquireShared(int acquires) {
+ assert maybeSetHasBlocked(getState());
+ return getState() == 0 ? 1 : -1;
+ }
+
+ // only used for testing
+ private boolean maybeSetHasBlocked(int state) {
+ hasBlockedThreads |= getState() != 0;
+ return true;
+ }
+
+ @Override
+ public boolean tryReleaseShared(int newState) {
+ return (getState() == 0);
+ }
+ }
+
+ private final Healthiness.Sync sync = new Sync();
+ volatile boolean wasStalled = false; // only with asserts
+
+ boolean isStalled() {
+ return !sync.isHealthy();
+ }
+
+ /**
+ * Update the stalled flag status. This method will set the stalled flag to
+ * <code>true</code> iff the number of flushing
+ * {@link DocumentsWriterPerThread} is greater than the number of active
+ * {@link DocumentsWriterPerThread}. Otherwise it will reset the
+ * {@link Healthiness} to healthy and release all threads waiting on
+ * {@link #waitIfStalled()}
+ */
+ void updateStalled(DocumentsWriterFlushControl flushControl) {
+ do {
+ // if we have more flushing DWPT than numActiveDWPT we stall!
+ while (flushControl.numActiveDWPT() < flushControl.numFlushingDWPT()) {
+ if (sync.trySetStalled()) {
+ assert wasStalled = true;
+ return;
+ }
+ }
+ } while (!sync.tryReset());
+ }
+
+ void waitIfStalled() {
+ sync.acquireShared(0);
+ }
+
+ boolean hasBlocked() {
+ return sync.hasBlockedThreads;
+ }
+}
\ No newline at end of file
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java Wed Mar 30 13:30:07 2011
@@ -248,8 +248,8 @@ public class IndexWriter implements Clos
private long mergeGen;
private boolean stopMerges;
- private final AtomicInteger flushCount = new AtomicInteger();
- private final AtomicInteger flushDeletesCount = new AtomicInteger();
+ final AtomicInteger flushCount = new AtomicInteger();
+ final AtomicInteger flushDeletesCount = new AtomicInteger();
final ReaderPool readerPool = new ReaderPool();
final BufferedDeletesStream bufferedDeletesStream;
@@ -2540,17 +2540,7 @@ public class IndexWriter implements Clos
doBeforeFlush();
assert testPoint("startDoFlush");
-
- // We may be flushing because it was triggered by doc
- // count, del count, ram usage (in which case flush
- // pending is already set), or we may be flushing
- // due to external event eg getReader or commit is
- // called (in which case we now set it, and this will
- // pause all threads):
- flushControl.setFlushPendingNoWait("explicit flush");
-
boolean success = false;
-
try {
if (infoStream != null) {
@@ -2566,8 +2556,7 @@ public class IndexWriter implements Clos
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
// tiny segments:
- if (flushControl.getFlushDeletes() ||
- (config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
+ if ((config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) {
applyAllDeletes = true;
if (infoStream != null) {
@@ -2580,39 +2569,16 @@ public class IndexWriter implements Clos
if (infoStream != null) {
message("apply all deletes during flush");
}
- flushDeletesCount.incrementAndGet();
- final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos);
- if (result.anyDeletes) {
- checkpoint();
- }
- if (!keepFullyDeletedSegments && result.allDeleted != null) {
- if (infoStream != null) {
- message("drop 100% deleted segments: " + result.allDeleted);
- }
- for(SegmentInfo info : result.allDeleted) {
- // If a merge has already registered for this
- // segment, we leave it in the readerPool; the
- // merge will skip merging it and will then drop
- // it once it's done:
- if (!mergingSegments.contains(info)) {
- segmentInfos.remove(info);
- if (readerPool != null) {
- readerPool.drop(info);
- }
- }
- }
- checkpoint();
- }
- bufferedDeletesStream.prune(segmentInfos);
- assert !bufferedDeletesStream.any();
-
- flushControl.clearDeletes();
+ applyAllDeletes();
} else if (infoStream != null) {
message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
}
doAfterFlush();
- flushCount.incrementAndGet();
+ if (!maybeMerge) {
+ // flushCount is incremented in flushAllThreads
+ flushCount.incrementAndGet();
+ }
success = true;
@@ -2624,20 +2590,51 @@ public class IndexWriter implements Clos
// never hit
return false;
} finally {
- flushControl.clearFlushPending();
if (!success && infoStream != null)
message("hit exception during flush");
}
}
+
+ final synchronized void applyAllDeletes() throws IOException {
+ flushDeletesCount.incrementAndGet();
+ final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos);
+ if (result.anyDeletes) {
+ checkpoint();
+ }
+ if (!keepFullyDeletedSegments && result.allDeleted != null) {
+ if (infoStream != null) {
+ message("drop 100% deleted segments: " + result.allDeleted);
+ }
+ for(SegmentInfo info : result.allDeleted) {
+ // If a merge has already registered for this
+ // segment, we leave it in the readerPool; the
+ // merge will skip merging it and will then drop
+ // it once it's done:
+ if (!mergingSegments.contains(info)) {
+ segmentInfos.remove(info);
+ if (readerPool != null) {
+ readerPool.drop(info);
+ }
+ }
+ }
+ checkpoint();
+ }
+ bufferedDeletesStream.prune(segmentInfos);
+ }
/** Expert: Return the total size of all index files currently cached in memory.
* Useful for size management with flushRamDocs()
*/
public final long ramSizeInBytes() {
ensureOpen();
- // nocommit
- //return docWriter.bytesUsed() + bufferedDeletesStream.bytesUsed();
- return 0;
+ return docWriter.flushControl.netBytes() + bufferedDeletesStream.bytesUsed();
+ }
+
+ // for testing only
+ DocumentsWriter getDocsWriter() {
+ boolean test = false;
+ assert test = true;
+ return test?docWriter: null;
}
/** Expert: Return the number of documents currently
@@ -3681,124 +3678,4 @@ public class IndexWriter implements Clos
public PayloadProcessorProvider getPayloadProcessorProvider() {
return payloadProcessorProvider;
}
-
- // decides when flushes happen
- final class FlushControl {
-
- private boolean flushPending;
- private boolean flushDeletes;
- private int delCount;
- private int docCount;
- private boolean flushing;
-
- private synchronized boolean setFlushPending(String reason, boolean doWait) {
- if (flushPending || flushing) {
- if (doWait) {
- while(flushPending || flushing) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
- }
- return false;
- } else {
- if (infoStream != null) {
- message("now trigger flush reason=" + reason);
- }
- flushPending = true;
- return flushPending;
- }
- }
-
- public synchronized void setFlushPendingNoWait(String reason) {
- setFlushPending(reason, false);
- }
-
- public synchronized boolean getFlushPending() {
- return flushPending;
- }
-
- public synchronized boolean getFlushDeletes() {
- return flushDeletes;
- }
-
- public synchronized void clearFlushPending() {
- if (infoStream != null) {
- message("clearFlushPending");
- }
- flushPending = false;
- flushDeletes = false;
- docCount = 0;
- notifyAll();
- }
-
- public synchronized void clearDeletes() {
- delCount = 0;
- }
-
- public synchronized boolean waitUpdate(int docInc, int delInc) {
- return waitUpdate(docInc, delInc, false);
- }
-
- public synchronized boolean waitUpdate(int docInc, int delInc, boolean skipWait) {
- while(flushPending) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
-
- // skipWait is only used when a thread is BOTH adding
- // a doc and buffering a del term, and, the adding of
- // the doc already triggered a flush
- if (skipWait) {
- docCount += docInc;
- delCount += delInc;
- return false;
- }
-
- final int maxBufferedDocs = config.getMaxBufferedDocs();
- if (maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- (docCount+docInc) >= maxBufferedDocs) {
- return setFlushPending("maxBufferedDocs", true);
- }
- docCount += docInc;
-
- final int maxBufferedDeleteTerms = config.getMaxBufferedDeleteTerms();
- if (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- (delCount+delInc) >= maxBufferedDeleteTerms) {
- flushDeletes = true;
- return setFlushPending("maxBufferedDeleteTerms", true);
- }
- delCount += delInc;
-
- return flushByRAMUsage("add delete/doc");
- }
-
- public synchronized boolean flushByRAMUsage(String reason) {
-// final double ramBufferSizeMB = config.getRAMBufferSizeMB();
-// if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-// final long limit = (long) (ramBufferSizeMB*1024*1024);
-// long used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
-// if (used >= limit) {
-//
-// // DocumentsWriter may be able to free up some
-// // RAM:
-// // Lock order: FC -> DW
-// docWriter.balanceRAM();
-//
-// used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
-// if (used >= limit) {
-// return setFlushPending("ram full: " + reason, false);
-// }
-// }
-// }
- return false;
- }
- }
-
- final FlushControl flushControl = new FlushControl();
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java Wed Mar 30 13:30:07 2011
@@ -94,6 +94,8 @@ public final class IndexWriterConfig imp
/** Default value is 1. Change using {@link #setReaderTermsIndexDivisor(int)}. */
public static final int DEFAULT_READER_TERMS_INDEX_DIVISOR = IndexReader.DEFAULT_TERMS_INDEX_DIVISOR;
+ /** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
+ public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
/**
* Sets the default (for any instance) maximum time to wait for a write lock
* (in milliseconds).
@@ -130,6 +132,8 @@ public final class IndexWriterConfig imp
private volatile DocumentsWriterPerThreadPool indexerThreadPool;
private volatile boolean readerPooling;
private volatile int readerTermsIndexDivisor;
+ private volatile FlushPolicy flushPolicy;
+ private volatile int perThreadHardLimitMB;
private Version matchVersion;
@@ -160,6 +164,7 @@ public final class IndexWriterConfig imp
readerPooling = DEFAULT_READER_POOLING;
indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(DEFAULT_MAX_THREAD_STATES);
readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
+ perThreadHardLimitMB = DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
}
@Override
@@ -352,6 +357,7 @@ public final class IndexWriterConfig imp
* @throws IllegalArgumentException if maxBufferedDeleteTerms
* is enabled but smaller than 1
* @see #setRAMBufferSizeMB
+ * @see #setFlushPolicy(FlushPolicy)
*
* <p>Takes effect immediately, but only the next time a
* document is added, updated or deleted.
@@ -380,14 +386,20 @@ public final class IndexWriterConfig imp
* and deletions before they are flushed to the Directory. Generally for
* faster indexing performance it's best to flush by RAM usage instead of
* document count and use as large a RAM buffer as you can.
- *
* <p>
* When this is set, the writer will flush whenever buffered documents and
* deletions use this much RAM. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent
* triggering a flush due to RAM usage. Note that if flushing by document
* count is also enabled, then the flush will be triggered by whichever comes
* first.
- *
+ * <p>
+ * The maximum RAM limit is inherently determined by the JVMs available memory.
+ * Yet, an {@link IndexWriter} session can consume a significantly larger amount
+ * of memory than the given RAM limit since this limit is just an indicator when
+ * to flush memory resident documents to the Directory. Flushes are likely happen
+ * concurrently while other threads adding documents to the writer. For application
+ * stability the available memory in the JVM should be significantly larger than
+ * the RAM buffer used for indexing.
* <p>
* <b>NOTE</b>: the account of RAM usage for pending deletions is only
* approximate. Specifically, if you delete by Query, Lucene currently has no
@@ -396,16 +408,15 @@ public final class IndexWriterConfig imp
* periodically yourself, or by using {@link #setMaxBufferedDeleteTerms(int)}
* to flush by count instead of RAM usage (each buffered delete Query counts
* as one).
- *
* <p>
- * <b>NOTE</b>: because IndexWriter uses <code>int</code>s when managing its
- * internal storage, the absolute maximum value for this setting is somewhat
- * less than 2048 MB. The precise limit depends on various factors, such as
- * how large your documents are, how many fields have norms, etc., so it's
- * best to set this value comfortably under 2048.
- *
+ * <b>NOTE</b>: It's not guaranteed that all memory resident documents are flushed
+ * once this limit is exceeded. Depending on the configured {@link FlushPolicy} only a
+ * subset of the buffered documents are flushed and therefore only parts of the RAM
+ * buffer is released.
* <p>
+ *
* The default value is {@link #DEFAULT_RAM_BUFFER_SIZE_MB}.
+ * @see #setFlushPolicy(FlushPolicy)
*
* <p>Takes effect immediately, but only the next time a
* document is added, updated or deleted.
@@ -413,12 +424,9 @@ public final class IndexWriterConfig imp
* @throws IllegalArgumentException
* if ramBufferSize is enabled but non-positive, or it disables
* ramBufferSize when maxBufferedDocs is already disabled
+ *
*/
public IndexWriterConfig setRAMBufferSizeMB(double ramBufferSizeMB) {
- if (ramBufferSizeMB > 2048.0) {
- throw new IllegalArgumentException("ramBufferSize " + ramBufferSizeMB
- + " is too large; should be comfortably less than 2048");
- }
if (ramBufferSizeMB != DISABLE_AUTO_FLUSH && ramBufferSizeMB <= 0.0)
throw new IllegalArgumentException(
"ramBufferSize should be > 0.0 MB when enabled");
@@ -453,7 +461,7 @@ public final class IndexWriterConfig imp
* document is added, updated or deleted.
*
* @see #setRAMBufferSizeMB(double)
- *
+ * @see #setFlushPolicy(FlushPolicy)
* @throws IllegalArgumentException
* if maxBufferedDocs is enabled but smaller than 2, or it disables
* maxBufferedDocs when ramBufferSize is already disabled
@@ -607,6 +615,53 @@ public final class IndexWriterConfig imp
public int getReaderTermsIndexDivisor() {
return readerTermsIndexDivisor;
}
+
+ /**
+ * Expert: Controls when segments are flushed to disk during indexing.
+ * The {@link FlushPolicy} initialized during {@link IndexWriter} instantiation and once initialized
+ * the given instance is bound to this {@link IndexWriter} and should not be used with another writer.
+ * @see #setMaxBufferedDeleteTerms(int)
+ * @see #setMaxBufferedDocs(int)
+ * @see #setRAMBufferSizeMB(double)
+ */
+ public IndexWriterConfig setFlushPolicy(FlushPolicy flushPolicy) {
+ this.flushPolicy = flushPolicy;
+ return this;
+ }
+
+ /**
+ * Expert: Sets the maximum memory consumption per thread triggering a forced
+ * flush if exceeded. A {@link DocumentsWriterPerThread} is forcefully flushed
+ * once it exceeds this limit even if the {@link #getRAMBufferSizeMB()} has
+ * not been exceeded. This is a safety limit to prevent a
+ * {@link DocumentsWriterPerThread} from address space exhaustion due to its
+ * internal 32 bit signed integer based memory addressing.
+ * The given value must be less that 2GB (2048MB)
+ *
+ * @see #DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB
+ */
+ public IndexWriterConfig setRAMPerThreadHardLimitMB(int perThreadHardLimitMB) {
+ if (perThreadHardLimitMB <= 0 || perThreadHardLimitMB >= 2048) {
+ throw new IllegalArgumentException("PerThreadHardLimit must be greater than 0 and less than 2048MB");
+ }
+ this.perThreadHardLimitMB = perThreadHardLimitMB;
+ return this;
+ }
+
+ /**
+ * Returns the max amount of memory each {@link DocumentsWriterPerThread} can
+ * consume until forcefully flushed.
+ * @see #setRAMPerThreadHardLimitMB(int)
+ */
+ public int getRAMPerThreadHardLimitMB() {
+ return perThreadHardLimitMB;
+ }
+ /**
+ * @see #setFlushPolicy(FlushPolicy)
+ */
+ public FlushPolicy getFlushPolicy() {
+ return flushPolicy;
+ }
@Override
public String toString() {
@@ -631,6 +686,10 @@ public final class IndexWriterConfig imp
sb.append("maxThreadStates=").append(indexerThreadPool.getMaxThreadStates()).append("\n");
sb.append("readerPooling=").append(readerPooling).append("\n");
sb.append("readerTermsIndexDivisor=").append(readerTermsIndexDivisor).append("\n");
+ sb.append("flushPolicy=").append(flushPolicy).append("\n");
+ sb.append("perThreadHardLimitMB=").append(perThreadHardLimitMB).append("\n");
+
return sb.toString();
}
+
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java Wed Mar 30 13:30:07 2011
@@ -1,5 +1,7 @@
package org.apache.lucene.index;
+import java.util.Arrays;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -36,6 +38,10 @@ final class IntBlockPool {
public void reset() {
if (bufferUpto != -1) {
// Reuse first buffer
+ if (bufferUpto > 0) {
+ docWriter.recycleIntBlocks(buffers, 1, bufferUpto-1);
+ Arrays.fill(buffers, 1, bufferUpto, null);
+ }
bufferUpto = 0;
intUpto = 0;
intOffset = 0;
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java Wed Mar 30 13:30:07 2011
@@ -52,13 +52,14 @@ final class TermsHash extends InvertedDo
// Used by perField to obtain terms from the analysis chain
final BytesRef termBytesRef = new BytesRef(10);
- boolean trackAllocations;
+ final boolean trackAllocations;
- public TermsHash(final DocumentsWriterPerThread docWriter, final TermsHashConsumer consumer, final TermsHash nextTermsHash) {
+ public TermsHash(final DocumentsWriterPerThread docWriter, final TermsHashConsumer consumer, boolean trackAllocations, final TermsHash nextTermsHash) {
this.docState = docWriter.docState;
this.docWriter = docWriter;
this.consumer = consumer;
+ this.trackAllocations = trackAllocations;
this.nextTermsHash = nextTermsHash;
intPool = new IntBlockPool(docWriter);
bytePool = new ByteBlockPool(docWriter.byteBlockAllocator);
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java Wed Mar 30 13:30:07 2011
@@ -63,8 +63,8 @@ final class TermsHashPerField extends In
termBytePool = termsHash.termBytePool;
docState = termsHash.docState;
this.termsHash = termsHash;
- bytesUsed = termsHash.trackAllocations?termsHash.docWriter.bytesUsed:new AtomicLong();
-
+ bytesUsed = termsHash.trackAllocations ? termsHash.docWriter.bytesUsed
+ : new AtomicLong();
fieldState = docInverterPerField.fieldState;
this.consumer = termsHash.consumer.addField(this, fieldInfo);
PostingsBytesStartArray byteStarts = new PostingsBytesStartArray(this, bytesUsed);
@@ -311,7 +311,7 @@ final class TermsHashPerField extends In
@Override
public int[] clear() {
if(perField.postingsArray != null) {
- bytesUsed.addAndGet(-perField.postingsArray.size * perField.postingsArray.bytesPerPosting());
+ bytesUsed.addAndGet(-(perField.postingsArray.size * perField.postingsArray.bytesPerPosting()));
perField.postingsArray = null;
}
return null;
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Wed Mar 30 13:30:07 2011
@@ -1,6 +1,20 @@
package org.apache.lucene.index;
-
-import java.util.Iterator;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -11,6 +25,7 @@ public class ThreadAffinityDocumentsWrit
public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
super(maxNumPerThreads);
+ assert getMaxThreadStates() >= 1;
}
@Override
@@ -21,25 +36,25 @@ public class ThreadAffinityDocumentsWrit
return threadState;
}
}
-
- // find the state that has minimum amount of threads waiting
- Iterator<ThreadState> it = getActivePerThreadsIterator();
ThreadState minThreadState = null;
- while (it.hasNext()) {
- ThreadState state = it.next();
- if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
- minThreadState = state;
- }
- }
-
+ // find the state that has minimum amount of threads waiting
+ minThreadState = minContendedThreadState();
if (minThreadState == null || minThreadState.hasQueuedThreads()) {
ThreadState newState = newThreadState();
if (newState != null) {
minThreadState = newState;
threadBindings.put(requestingThread, newState);
+ } else if (minThreadState == null) {
+ /*
+ * no new threadState available we just take the minContented one
+ * This must return a valid thread state since we accessed the
+ * synced context in newThreadState() above.
+ */
+ minThreadState = minContendedThreadState();
}
}
-
+ assert minThreadState != null: "ThreadState is null";
+
minThreadState.lock();
return minThreadState;
}
@@ -53,4 +68,5 @@ public class ThreadAffinityDocumentsWrit
public void clearAllThreadBindings() {
threadBindings.clear();
}
+
}