You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2011/03/30 15:30:07 UTC
svn commit: r1086947 [2/2] - in
/lucene/dev/branches/realtime_search/lucene/src:
java/org/apache/lucene/index/ test-framework/org/apache/lucene/store/
test-framework/org/apache/lucene/util/ test/org/apache/lucene/index/
Modified: lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java Wed Mar 30 13:30:07 2011
@@ -33,6 +33,7 @@ import java.util.Set;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.ThrottledIndexOutput;
import org.apache.lucene.util._TestUtil;
/**
@@ -56,6 +57,7 @@ public class MockDirectoryWrapper extend
private Set<String> createdFiles;
Set<String> openFilesForWrite = new HashSet<String>();
volatile boolean crashed;
+ private ThrottledIndexOutput throttledOutput;
// use this for tracking files for crash.
// additionally: provides debugging information in case you leave one open
@@ -101,6 +103,10 @@ public class MockDirectoryWrapper extend
public void setPreventDoubleWrite(boolean value) {
preventDoubleWrite = value;
}
+
+ public void setThrottledIndexOutput(ThrottledIndexOutput throttledOutput) {
+ this.throttledOutput = throttledOutput;
+ }
@Override
public synchronized void sync(Collection<String> names) throws IOException {
@@ -335,7 +341,7 @@ public class MockDirectoryWrapper extend
IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name), name);
openFileHandles.put(io, new RuntimeException("unclosed IndexOutput"));
openFilesForWrite.add(name);
- return io;
+ return throttledOutput == null ? io : throttledOutput.newFromDelegate(io);
}
@Override
@@ -547,4 +553,5 @@ public class MockDirectoryWrapper extend
maybeYield();
delegate.copy(to, src, dest);
}
+
}
Added: lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/util/ThrottledIndexOutput.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/util/ThrottledIndexOutput.java?rev=1086947&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/util/ThrottledIndexOutput.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/util/ThrottledIndexOutput.java Wed Mar 30 13:30:07 2011
@@ -0,0 +1,147 @@
+package org.apache.lucene.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.IndexOutput;
+
+public class ThrottledIndexOutput extends IndexOutput {
+ public static final int DEFAULT_MIN_WRITTEN_BYTES = 1024;
+ private final int bytesPerSecond;
+ private IndexOutput delegate;
+ private long flushDelayMillis;
+ private long closeDelayMillis;
+ private long seekDelayMillis;
+ private long pendingBytes;
+ private long minBytesWritten;
+ private long timeElapsed;
+ private final byte[] bytes = new byte[1];
+
+ public ThrottledIndexOutput newFromDelegate(IndexOutput output) {
+ return new ThrottledIndexOutput(bytesPerSecond, flushDelayMillis,
+ closeDelayMillis, seekDelayMillis, minBytesWritten, output);
+ }
+
+ public ThrottledIndexOutput(int bytesPerSecond, long delayInMillis,
+ IndexOutput delegate) {
+ this(bytesPerSecond, delayInMillis, delayInMillis, delayInMillis,
+ DEFAULT_MIN_WRITTEN_BYTES, delegate);
+ }
+
+ public ThrottledIndexOutput(int bytesPerSecond, long delays,
+ int minBytesWritten, IndexOutput delegate) {
+ this(bytesPerSecond, delays, delays, delays, minBytesWritten, delegate);
+ }
+
+ public static final int mBitsToBytes(int mbits) {
+ return mbits * 125000;
+ }
+
+ public ThrottledIndexOutput(int bytesPerSecond, long flushDelayMillis,
+ long closeDelayMillis, long seekDelayMillis, long minBytesWritten,
+ IndexOutput delegate) {
+ assert bytesPerSecond > 0;
+ this.delegate = delegate;
+ this.bytesPerSecond = bytesPerSecond;
+ this.flushDelayMillis = flushDelayMillis;
+ this.closeDelayMillis = closeDelayMillis;
+ this.seekDelayMillis = seekDelayMillis;
+ this.minBytesWritten = minBytesWritten;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ sleep(flushDelayMillis);
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ sleep(closeDelayMillis + getDelay(true));
+ delegate.close();
+
+ }
+
+ @Override
+ public long getFilePointer() {
+ return delegate.getFilePointer();
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ sleep(seekDelayMillis);
+ delegate.seek(pos);
+ }
+
+ @Override
+ public long length() throws IOException {
+ return delegate.length();
+ }
+
+ @Override
+ public void writeByte(byte b) throws IOException {
+ bytes[0] = b;
+ writeBytes(bytes, 0, 1);
+ }
+
+ @Override
+ public void writeBytes(byte[] b, int offset, int length) throws IOException {
+ final long before = System.nanoTime();
+ delegate.writeBytes(b, offset, length);
+ timeElapsed += System.nanoTime() - before;
+ pendingBytes += length;
+ sleep(getDelay(false));
+
+ }
+
+ protected long getDelay(boolean closing) {
+ if (pendingBytes > 0 && (closing || pendingBytes > minBytesWritten)) {
+ long actualBps = (timeElapsed / pendingBytes) * 1000000000l; // nano to sec
+ if (actualBps > bytesPerSecond) {
+ long expected = (pendingBytes * 1000l / bytesPerSecond) ;
+ final long delay = expected - (timeElapsed / 1000000l) ;
+ pendingBytes = 0;
+ timeElapsed = 0;
+ return delay;
+ }
+ }
+ return 0;
+
+ }
+
+ private static final void sleep(long ms) {
+ if (ms <= 0)
+ return;
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ throw new ThreadInterruptedException(e);
+ }
+ }
+
+ @Override
+ public void setLength(long length) throws IOException {
+ delegate.setLength(length);
+ }
+
+ @Override
+ public void copyBytes(DataInput input, long numBytes) throws IOException {
+ delegate.copyBytes(input, numBytes);
+ }
+}
Added: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java?rev=1086947&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java Wed Mar 30 13:30:07 2011
@@ -0,0 +1,458 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.LineFileDocs;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.ThrottledIndexOutput;
+import org.junit.Before;
+
+public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
+
+ private LineFileDocs lineDocFile;
+ private int numCPUs;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ lineDocFile = new LineFileDocs(random);
+ numCPUs = Runtime.getRuntime().availableProcessors();
+ }
+
+ public void testFlushByRam() throws CorruptIndexException,
+ LockObtainFailedException, IOException, InterruptedException {
+ int[] numThreads = new int[] { numCPUs + random.nextInt(numCPUs + 1), 1 };
+ for (int i = 0; i < numThreads.length; i++) {
+ runFlushByRam(numThreads[i],
+ 1 + random.nextInt(10) + random.nextDouble(), false);
+ }
+
+ for (int i = 0; i < numThreads.length; i++) {
+ // with a 250 mb ram buffer we should never stall
+ runFlushByRam(numThreads[i], 250.d, true);
+ }
+ }
+
+ protected void runFlushByRam(int numThreads, double maxRam,
+ boolean ensureNotStalled) throws IOException, CorruptIndexException,
+ LockObtainFailedException, InterruptedException {
+ final int numDocumentsToIndex = 50 + random.nextInt(150);
+ AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
+ Directory dir = newDirectory();
+ MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy();
+ IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
+ new MockAnalyzer()).setFlushPolicy(flushPolicy);
+
+ final int numDWPT = 1 + random.nextInt(8);
+ DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+ numDWPT);
+ iwc.setIndexerThreadPool(threadPool);
+ iwc.setRAMBufferSizeMB(1 + random.nextInt(10) + random.nextDouble());
+ iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
+ iwc.setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH);
+ IndexWriter writer = new IndexWriter(dir, iwc);
+ assertFalse(flushPolicy.flushOnDocCount());
+ assertFalse(flushPolicy.flushOnDeleteTerms());
+ assertTrue(flushPolicy.flushOnRAM());
+ DocumentsWriter docsWriter = writer.getDocsWriter();
+ assertNotNull(docsWriter);
+ DocumentsWriterFlushControl flushControl = docsWriter.flushControl;
+ assertEquals(" bytes must be 0 after init", 0, flushControl.flushBytes());
+
+ IndexThread[] threads = new IndexThread[numThreads];
+ for (int x = 0; x < threads.length; x++) {
+ threads[x] = new IndexThread(numDocs, numThreads, writer, lineDocFile,
+ false);
+ threads[x].start();
+ }
+
+ for (int x = 0; x < threads.length; x++) {
+ threads[x].join();
+ }
+ final long maxRAMBytes = (long) (iwc.getRAMBufferSizeMB() * 1024. * 1024.);
+ assertEquals(" all flushes must be due numThreads=" + numThreads, 0,
+ flushControl.flushBytes());
+ assertEquals(numDocumentsToIndex, writer.numDocs());
+ assertEquals(numDocumentsToIndex, writer.maxDoc());
+ assertTrue("peak bytes without flush exceeded watermark",
+ flushPolicy.peakBytesWithoutFlush <= maxRAMBytes);
+ assertActiveBytesAfter(flushControl);
+ if (flushPolicy.hasMarkedPending) {
+ assertTrue(maxRAMBytes < flushControl.peakActiveBytes);
+ }
+ if (ensureNotStalled) {
+ assertFalse(docsWriter.healthiness.wasStalled);
+ }
+ writer.close();
+ assertEquals(0, flushControl.activeBytes());
+ dir.close();
+ }
+
+ public void testFlushDocCount() throws CorruptIndexException,
+ LockObtainFailedException, IOException, InterruptedException {
+ int[] numThreads = new int[] { numCPUs + random.nextInt(numCPUs + 1), 1 };
+ for (int i = 0; i < numThreads.length; i++) {
+
+ final int numDocumentsToIndex = 50 + random.nextInt(150);
+ AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
+ Directory dir = newDirectory();
+ MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy();
+ IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
+ new MockAnalyzer()).setFlushPolicy(flushPolicy);
+
+ final int numDWPT = 1 + random.nextInt(8);
+ DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+ numDWPT);
+ iwc.setIndexerThreadPool(threadPool);
+ iwc.setMaxBufferedDocs(2 + random.nextInt(50));
+ iwc.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH);
+ iwc.setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH);
+ IndexWriter writer = new IndexWriter(dir, iwc);
+ assertTrue(flushPolicy.flushOnDocCount());
+ assertFalse(flushPolicy.flushOnDeleteTerms());
+ assertFalse(flushPolicy.flushOnRAM());
+ DocumentsWriter docsWriter = writer.getDocsWriter();
+ assertNotNull(docsWriter);
+ DocumentsWriterFlushControl flushControl = docsWriter.flushControl;
+ assertEquals(" bytes must be 0 after init", 0, flushControl.flushBytes());
+
+ IndexThread[] threads = new IndexThread[numThreads[i]];
+ for (int x = 0; x < threads.length; x++) {
+ threads[x] = new IndexThread(numDocs, numThreads[i], writer,
+ lineDocFile, false);
+ threads[x].start();
+ }
+
+ for (int x = 0; x < threads.length; x++) {
+ threads[x].join();
+ }
+
+ assertEquals(" all flushes must be due numThreads=" + numThreads[i], 0,
+ flushControl.flushBytes());
+ assertEquals(numDocumentsToIndex, writer.numDocs());
+ assertEquals(numDocumentsToIndex, writer.maxDoc());
+ assertTrue("peak bytes without flush exceeded watermark",
+ flushPolicy.peakDocCountWithoutFlush <= iwc.getMaxBufferedDocs());
+ assertActiveBytesAfter(flushControl);
+ writer.close();
+ assertEquals(0, flushControl.activeBytes());
+ dir.close();
+ }
+ }
+
+ public void testFlushPolicySetup() throws IOException {
+ Directory dir = newDirectory();
+ FlushByRamOrCountsPolicy flushPolicy = new FlushByRamOrCountsPolicy();
+ IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
+ new MockAnalyzer()).setFlushPolicy(flushPolicy);
+
+ final int numDWPT = 1 + random.nextInt(10);
+ DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+ numDWPT);
+ iwc.setIndexerThreadPool(threadPool);
+ double maxMB = 1.0 + Math.ceil(random.nextDouble());
+ iwc.setRAMBufferSizeMB(maxMB);
+ iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
+
+ IndexWriter writer = new IndexWriter(dir, iwc);
+ assertEquals((long) (maxMB * 1024. * 1024. * 2.),
+ flushPolicy.getMaxNetBytes());
+
+ writer.close();
+ dir.close();
+ }
+
+ public void testRandom() throws IOException, InterruptedException {
+ final int numThreads = 1 + random.nextInt(8);
+ final int numDocumentsToIndex = 100 + random.nextInt(300);
+ AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
+ Directory dir = newDirectory();
+ IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
+ new MockAnalyzer());
+ MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy();
+ iwc.setFlushPolicy(flushPolicy);
+
+ final int numDWPT = 1 + random.nextInt(8);
+ DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+ numDWPT);
+ iwc.setIndexerThreadPool(threadPool);
+
+ IndexWriter writer = new IndexWriter(dir, iwc);
+ DocumentsWriter docsWriter = writer.getDocsWriter();
+ assertNotNull(docsWriter);
+ DocumentsWriterFlushControl flushControl = docsWriter.flushControl;
+
+ assertEquals(" bytes must be 0 after init", 0, flushControl.flushBytes());
+
+ IndexThread[] threads = new IndexThread[numThreads];
+ for (int x = 0; x < threads.length; x++) {
+ threads[x] = new IndexThread(numDocs, numThreads, writer, lineDocFile,
+ true);
+ threads[x].start();
+ }
+
+ for (int x = 0; x < threads.length; x++) {
+ threads[x].join();
+ }
+
+ assertEquals(" all flushes must be due", 0, flushControl.flushBytes());
+ assertEquals(numDocumentsToIndex, writer.numDocs());
+ assertEquals(numDocumentsToIndex, writer.maxDoc());
+ if (flushPolicy.flushOnRAM() && !flushPolicy.flushOnDocCount()
+ && !flushPolicy.flushOnDeleteTerms()) {
+ final long maxRAMBytes = (long) (iwc.getRAMBufferSizeMB() * 1024. * 1024.);
+ assertTrue("peak bytes without flush exceeded watermark",
+ flushPolicy.peakBytesWithoutFlush <= maxRAMBytes);
+ if (flushPolicy.hasMarkedPending) {
+ assertTrue("max: " + maxRAMBytes + " " + flushControl.peakActiveBytes,
+ maxRAMBytes <= flushControl.peakActiveBytes);
+ }
+ }
+ assertActiveBytesAfter(flushControl);
+ writer.commit();
+ assertEquals(0, flushControl.activeBytes());
+ IndexReader r = IndexReader.open(dir);
+ assertEquals(numDocumentsToIndex, r.numDocs());
+ assertEquals(numDocumentsToIndex, r.maxDoc());
+ if (!flushPolicy.flushOnRAM()) {
+ assertFalse("never stall if we don't flush on RAM", docsWriter.healthiness.wasStalled);
+ assertFalse("never block if we don't flush on RAM", docsWriter.healthiness.hasBlocked());
+ }
+ r.close();
+ writer.close();
+ dir.close();
+ }
+
+ public void testHealthyness() throws InterruptedException,
+ CorruptIndexException, LockObtainFailedException, IOException {
+
+ int[] numThreads = new int[] { 3 + random.nextInt(8), 1 };
+ final int numDocumentsToIndex = 50 + random.nextInt(50);
+ for (int i = 0; i < numThreads.length; i++) {
+ AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
+ MockDirectoryWrapper dir = newDirectory();
+ // mock a very slow harddisk here so that flushing is very slow
+ dir.setThrottledIndexOutput(new ThrottledIndexOutput(ThrottledIndexOutput
+ .mBitsToBytes(50 + random.nextInt(10)), 5 + random.nextInt(5), null));
+ IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
+ new MockAnalyzer());
+ iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
+ iwc.setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH);
+ FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy();
+ iwc.setFlushPolicy(flushPolicy);
+
+ DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+ numThreads[i]== 1 ? 1 : 2);
+ iwc.setIndexerThreadPool(threadPool);
+ // with such a small ram buffer we should be stalled quiet quickly
+ iwc.setRAMBufferSizeMB(0.25);
+ IndexWriter writer = new IndexWriter(dir, iwc);
+ IndexThread[] threads = new IndexThread[numThreads[i]];
+ for (int x = 0; x < threads.length; x++) {
+ threads[x] = new IndexThread(numDocs, numThreads[i], writer,
+ lineDocFile, false);
+ threads[x].start();
+ }
+
+ for (int x = 0; x < threads.length; x++) {
+ threads[x].join();
+ }
+ DocumentsWriter docsWriter = writer.getDocsWriter();
+ assertNotNull(docsWriter);
+ DocumentsWriterFlushControl flushControl = docsWriter.flushControl;
+ assertEquals(" all flushes must be due", 0, flushControl.flushBytes());
+ assertEquals(numDocumentsToIndex, writer.numDocs());
+ assertEquals(numDocumentsToIndex, writer.maxDoc());
+ if (flushControl.peakNetBytes > (long)(iwc.getRAMBufferSizeMB() * 1024d * 1024d * 2d)) {
+ assertTrue("should be unhealthy here numThreads: " + numThreads[i],
+ docsWriter.healthiness.wasStalled);
+ }
+
+ if (numThreads[i] == 1) { // single thread could be unhealthy is a single
+ // doc is very large?!
+ assertFalse(
+ "single thread must not block numThreads: " + numThreads[i],
+ docsWriter.healthiness.hasBlocked());
+ } else {
+ if (docsWriter.healthiness.wasStalled) {
+ // TODO maybe this assumtion is too strickt
+ assertTrue(" we should have blocked here numThreads: "
+ + numThreads[i], docsWriter.healthiness.hasBlocked());
+ }
+ }
+ assertActiveBytesAfter(flushControl);
+ writer.close(true);
+ dir.close();
+ }
+ }
+
+ protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) {
+ Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreads();
+ long bytesUsed = 0;
+ while (allActiveThreads.hasNext()) {
+ bytesUsed += allActiveThreads.next().perThread.bytesUsed();
+ }
+ assertEquals(bytesUsed, flushControl.activeBytes());
+ }
+
+ public class IndexThread extends Thread {
+ IndexWriter writer;
+ IndexWriterConfig iwc;
+ LineFileDocs docs;
+ private AtomicInteger pendingDocs;
+ private final boolean doRandomCommit;
+
+ public IndexThread(AtomicInteger pendingDocs, int numThreads,
+ IndexWriter writer, LineFileDocs docs, boolean doRandomCommit) {
+ this.pendingDocs = pendingDocs;
+ this.writer = writer;
+ iwc = writer.getConfig();
+ this.docs = docs;
+ this.doRandomCommit = doRandomCommit;
+ }
+
+ public void run() {
+ try {
+ long ramSize = 0;
+ while (pendingDocs.decrementAndGet() > -1) {
+ Document doc = docs.nextDoc();
+ writer.addDocument(doc);
+ long newRamSize = writer.ramSizeInBytes();
+ if (newRamSize != ramSize) {
+ ramSize = newRamSize;
+ }
+ if (doRandomCommit) {
+ int commit;
+ synchronized (random) {
+ commit = random.nextInt(20);
+ }
+ if (commit == 0) {
+ writer.commit();
+ }
+ }
+ }
+ } catch (Throwable ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ private static class MockDefaultFlushPolicy extends FlushByRamOrCountsPolicy {
+ long peakBytesWithoutFlush = Integer.MIN_VALUE;
+ long peakDocCountWithoutFlush = Integer.MIN_VALUE;
+ boolean hasMarkedPending = false;
+
+ @Override
+ public void onDelete(DocumentsWriterFlushControl control, ThreadState state) {
+ final ArrayList<ThreadState> pending = new ArrayList<DocumentsWriterPerThreadPool.ThreadState>();
+ final ArrayList<ThreadState> notPending = new ArrayList<DocumentsWriterPerThreadPool.ThreadState>();
+ findPending(control, pending, notPending);
+ final boolean flushCurrent = state.flushPending;
+ final ThreadState toFlush;
+ if (state.flushPending) {
+ toFlush = state;
+ } else if (flushOnDeleteTerms()
+ && state.perThread.pendingDeletes.numTermDeletes.get() >= indexWriterConfig
+ .getMaxBufferedDeleteTerms()) {
+ toFlush = state;
+ } else {
+ toFlush = null;
+ }
+ super.onDelete(control, state);
+ if (toFlush != null) {
+ if (flushCurrent) {
+ assertTrue(pending.remove(toFlush));
+ } else {
+ assertTrue(notPending.remove(toFlush));
+ }
+ assertTrue(toFlush.flushPending);
+ hasMarkedPending = true;
+ }
+
+ for (ThreadState threadState : notPending) {
+ assertFalse(threadState.flushPending);
+ }
+ }
+
+ @Override
+ public void onInsert(DocumentsWriterFlushControl control, ThreadState state) {
+ final ArrayList<ThreadState> pending = new ArrayList<DocumentsWriterPerThreadPool.ThreadState>();
+ final ArrayList<ThreadState> notPending = new ArrayList<DocumentsWriterPerThreadPool.ThreadState>();
+ findPending(control, pending, notPending);
+ final boolean flushCurrent = state.flushPending;
+ long activeBytes = control.activeBytes();
+ final ThreadState toFlush;
+ if (state.flushPending) {
+ toFlush = state;
+ } else if (flushOnDocCount()
+ && state.perThread.getNumDocsInRAM() >= indexWriterConfig
+ .getMaxBufferedDocs()) {
+ toFlush = state;
+ } else if (flushOnRAM()
+ && activeBytes >= (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024. * 1024.)) {
+ toFlush = findLargestNonPendingWriter(control, state);
+ assertFalse(toFlush.flushPending);
+ } else {
+ toFlush = null;
+ }
+ super.onInsert(control, state);
+ if (toFlush != null) {
+ if (flushCurrent) {
+ assertTrue(pending.remove(toFlush));
+ } else {
+ assertTrue(notPending.remove(toFlush));
+ }
+ assertTrue(toFlush.flushPending);
+ hasMarkedPending = true;
+ } else {
+ peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush);
+ peakDocCountWithoutFlush = Math.max(state.perThread.getNumDocsInRAM(),
+ peakDocCountWithoutFlush);
+ }
+
+ for (ThreadState threadState : notPending) {
+ assertFalse(threadState.flushPending);
+ }
+ }
+ }
+
+ static void findPending(DocumentsWriterFlushControl flushControl,
+ ArrayList<ThreadState> pending, ArrayList<ThreadState> notPending) {
+ Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreads();
+ while (allActiveThreads.hasNext()) {
+ ThreadState next = allActiveThreads.next();
+ if (next.flushPending) {
+ pending.add(next);
+ } else {
+ notPending.add(next);
+ }
+ }
+ }
+}
Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java Wed Mar 30 13:30:07 2011
@@ -759,13 +759,14 @@ public class TestIndexWriter extends Luc
writer.deleteDocuments(new Term("field", "aaa" + j));
_TestUtil.syncConcurrentMerges(writer);
int flushCount = writer.getFlushCount();
+
if (j == 1)
lastFlushCount = flushCount;
else if (j < 10) {
// No new files should be created
assertEquals(flushCount, lastFlushCount);
} else if (10 == j) {
- assertTrue(flushCount > lastFlushCount);
+ assertTrue("" + j, flushCount > lastFlushCount);
lastFlushCount = flushCount;
writer.getConfig().setRAMBufferSizeMB(0.000001);
writer.getConfig().setMaxBufferedDeleteTerms(1);
Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java Wed Mar 30 13:30:07 2011
@@ -70,6 +70,10 @@ public class TestIndexWriterConfig exten
assertEquals(IndexWriterConfig.DEFAULT_READER_TERMS_INDEX_DIVISOR, conf.getReaderTermsIndexDivisor());
assertEquals(LogByteSizeMergePolicy.class, conf.getMergePolicy().getClass());
assertEquals(ThreadAffinityDocumentsWriterThreadPool.class, conf.getIndexerThreadPool().getClass());
+ assertNull(conf.getFlushPolicy());
+ assertEquals(IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB, conf.getRAMPerThreadHardLimitMB());
+
+
// Sanity check - validate that all getters are covered.
Set<String> getters = new HashSet<String>();
@@ -94,6 +98,9 @@ public class TestIndexWriterConfig exten
getters.add("getReaderPooling");
getters.add("getIndexerThreadPool");
getters.add("getReaderTermsIndexDivisor");
+ getters.add("getFlushPolicy");
+ getters.add("getRAMPerThreadHardLimitMB");
+
for (Method m : IndexWriterConfig.class.getDeclaredMethods()) {
if (m.getDeclaringClass() == IndexWriterConfig.class && m.getName().startsWith("get")) {
assertTrue("method " + m.getName() + " is not tested for defaults", getters.contains(m.getName()));
@@ -241,6 +248,20 @@ public class TestIndexWriterConfig exten
// this is expected
}
+ try {
+ conf.setRAMPerThreadHardLimitMB(2048);
+ fail("should not have succeeded to set RAMPerThreadHardLimitMB to >= 2048");
+ } catch (IllegalArgumentException e) {
+ // this is expected
+ }
+
+ try {
+ conf.setRAMPerThreadHardLimitMB(0);
+ fail("should not have succeeded to set RAMPerThreadHardLimitMB to 0");
+ } catch (IllegalArgumentException e) {
+ // this is expected
+ }
+
assertEquals(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, conf.getMaxThreadStates());
conf.setIndexerThreadPool(new ThreadAffinityDocumentsWriterThreadPool(5));
assertEquals(5, conf.getMaxThreadStates());
Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Wed Mar 30 13:30:07 2011
@@ -124,7 +124,7 @@ public class TestIndexWriterDelete exten
writer.close();
dir.close();
}
-
+
// test when delete terms only apply to ram segments
public void testRAMDeletes() throws IOException {
for(int t=0;t<2;t++) {