You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/11/08 22:29:44 UTC
svn commit: r1407271 - in /lucene/dev/branches/branch_4x: ./ lucene/
lucene/core/ lucene/core/src/java/org/apache/lucene/store/
lucene/test-framework/
lucene/test-framework/src/java/org/apache/lucene/store/
lucene/test-framework/src/java/org/apache/luc...
Author: simonw
Date: Thu Nov 8 21:29:43 2012
New Revision: 1407271
URL: http://svn.apache.org/viewvc?rev=1407271&view=rev
Log:
LUCENE-4537: Separate RateLimiter from Directory
Added:
lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java
- copied unchanged from r1407268, lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java
lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
- copied unchanged from r1407268, lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/lucene/ (props changed)
lucene/dev/branches/branch_4x/lucene/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_4x/lucene/core/ (props changed)
lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
lucene/dev/branches/branch_4x/lucene/test-framework/ (props changed)
lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
Modified: lucene/dev/branches/branch_4x/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/CHANGES.txt?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/lucene/CHANGES.txt Thu Nov 8 21:29:43 2012
@@ -82,7 +82,11 @@ API Changes
* LUCENE-4520: ValueSource.getSortField no longer throws IOExceptions
(Alan Woodward)
-
+
+* LUCENE-4537: RateLimiter is now separated from FSDirectory and exposed via
+ RateLimitingDirectoryWrapper. Any Directory can now be rate-limited.
+ (Simon Willnauer)
+
Bug Fixes
* LUCENE-1822: BaseFragListBuilder hard-coded 6 char margin is too naive.
Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java (original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java Thu Nov 8 21:29:43 2012
@@ -123,9 +123,6 @@ public abstract class FSDirectory extend
protected final Set<String> staleFiles = synchronizedSet(new HashSet<String>()); // Files written, but not yet sync'ed
private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566
- // null means no limit
- private volatile RateLimiter mergeWriteRateLimiter;
-
// returns the canonical version of the directory, creating it if it doesn't exist.
private static File getCanonicalPath(File file) throws IOException {
return new File(file.getCanonicalPath());
@@ -286,51 +283,7 @@ public abstract class FSDirectory extend
ensureOpen();
ensureCanWrite(name);
- return new FSIndexOutput(this, name, context.context == IOContext.Context.MERGE ? mergeWriteRateLimiter : null);
- }
-
- /** Sets the maximum (approx) MB/sec allowed by all write
- * IO performed by merging. Pass null to have no limit.
- *
- * <p><b>NOTE</b>: if merges are already running there is
- * no guarantee this new rate will apply to them; it will
- * only apply for certain to new merges.
- *
- * @lucene.experimental */
- public void setMaxMergeWriteMBPerSec(Double mbPerSec) {
- RateLimiter limiter = mergeWriteRateLimiter;
- if (mbPerSec == null) {
- if (limiter != null) {
- limiter.setMbPerSec(Double.MAX_VALUE);
- mergeWriteRateLimiter = null;
- }
- } else if (limiter != null) {
- limiter.setMbPerSec(mbPerSec);
- } else {
- mergeWriteRateLimiter = new RateLimiter(mbPerSec);
- }
- }
-
- /**
- * Sets the rate limiter to be used to limit (approx) MB/sec allowed
- * by all IO performed when merging. Pass null to have no limit.
- *
- * <p>Passing an instance of rate limiter compared to setting it using
- * {@link #setMaxMergeWriteMBPerSec(Double)} allows to use the same limiter
- * instance across several directories globally limiting IO when merging
- * across them.
- *
- * @lucene.experimental */
- public void setMaxMergeWriteLimiter(RateLimiter mergeWriteRateLimiter) {
- this.mergeWriteRateLimiter = mergeWriteRateLimiter;
- }
-
- /** See {@link #setMaxMergeWriteMBPerSec}.
- *
- * @lucene.experimental */
- public Double getMaxMergeWriteMBPerSec() {
- RateLimiter limiter = mergeWriteRateLimiter;
- return limiter == null ? null : limiter.getMbPerSec();
+ return new FSIndexOutput(this, name);
}
protected void ensureCanWrite(String name) throws IOException {
@@ -504,23 +457,18 @@ public abstract class FSDirectory extend
private final String name;
private final RandomAccessFile file;
private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once
- private final RateLimiter rateLimiter;
- public FSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter) throws IOException {
+ public FSIndexOutput(FSDirectory parent, String name) throws IOException {
this.parent = parent;
this.name = name;
file = new RandomAccessFile(new File(parent.directory, name), "rw");
isOpen = true;
- this.rateLimiter = rateLimiter;
}
/** output methods: */
@Override
public void flushBuffer(byte[] b, int offset, int size) throws IOException {
assert isOpen;
- if (rateLimiter != null) {
- rateLimiter.pause(size);
- }
file.write(b, offset, size);
}
Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java (original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java Thu Nov 8 21:29:43 2012
@@ -19,75 +19,102 @@ package org.apache.lucene.store;
import org.apache.lucene.util.ThreadInterruptedException;
-/** Simple class to rate limit IO. Typically it's shared
- * across multiple IndexInputs or IndexOutputs (for example
+/** Abstract base class to rate limit IO. Typically implementations are
+ * shared across multiple IndexInputs or IndexOutputs (for example
* those involved all merging). Those IndexInputs and
* IndexOutputs would call {@link #pause} whenever they
* want to read bytes or write bytes. */
-
-public class RateLimiter {
- private volatile double mbPerSec;
- private volatile double nsPerByte;
- private volatile long lastNS;
-
- // TODO: we could also allow eg a sub class to dynamically
- // determine the allowed rate, eg if an app wants to
- // change the allowed rate over time or something
-
- /** mbPerSec is the MB/sec max IO rate */
- public RateLimiter(double mbPerSec) {
- setMbPerSec(mbPerSec);
- }
+public abstract class RateLimiter {
/**
* Sets an updated mb per second rate limit.
*/
- public void setMbPerSec(double mbPerSec) {
- this.mbPerSec = mbPerSec;
- nsPerByte = 1000000000. / (1024*1024*mbPerSec);
- }
-
+ public abstract void setMbPerSec(double mbPerSec);
/**
* The current mb per second rate limit.
*/
- public double getMbPerSec() {
- return this.mbPerSec;
- }
-
+ public abstract double getMbPerSec();
+
/** Pauses, if necessary, to keep the instantaneous IO
- * rate at or below the target. NOTE: multiple threads
- * may safely use this, however the implementation is
- * not perfectly thread safe but likely in practice this
- * is harmless (just means in some rare cases the rate
- * might exceed the target). It's best to call this
- * with a biggish count, not one byte at a time. */
- public void pause(long bytes) {
- if (bytes == 1) {
- return;
+ * rate at or below the target.
+ * <p>
+ * Note: the implementation is thread-safe
+ * </p>
+ * @return the pause time in nano seconds
+ * */
+ public abstract long pause(long bytes);
+
+ /**
+ * Simple class to rate limit IO.
+ */
+ public static class SimpleRateLimiter extends RateLimiter {
+ private volatile double mbPerSec;
+ private volatile double nsPerByte;
+ private volatile long lastNS;
+
+ // TODO: we could also allow eg a sub class to dynamically
+ // determine the allowed rate, eg if an app wants to
+ // change the allowed rate over time or something
+
+ /** mbPerSec is the MB/sec max IO rate */
+ public SimpleRateLimiter(double mbPerSec) {
+ setMbPerSec(mbPerSec);
+ }
+
+ /**
+ * Sets an updated mb per second rate limit.
+ */
+ public void setMbPerSec(double mbPerSec) {
+ this.mbPerSec = mbPerSec;
+ nsPerByte = 1000000000. / (1024*1024*mbPerSec);
+
}
- // TODO: this is purely instantaneous rate; maybe we
- // should also offer decayed recent history one?
- final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
- long curNS = System.nanoTime();
- if (lastNS < curNS) {
- lastNS = curNS;
+ /**
+ * The current mb per second rate limit.
+ */
+ public double getMbPerSec() {
+ return this.mbPerSec;
}
+
+ /** Pauses, if necessary, to keep the instantaneous IO
+ * rate at or below the target. NOTE: multiple threads
+ * may safely use this, however the implementation is
+ * not perfectly thread safe but likely in practice this
+ * is harmless (just means in some rare cases the rate
+ * might exceed the target). It's best to call this
+ * with a biggish count, not one byte at a time.
+ * @return the pause time in nano seconds
+ * */
+ public long pause(long bytes) {
+ if (bytes == 1) {
+ return 0;
+ }
+
+ // TODO: this is purely instantaneous rate; maybe we
+ // should also offer decayed recent history one?
+ final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
+ long curNS = System.nanoTime();
+ if (lastNS < curNS) {
+ lastNS = curNS;
+ }
- // While loop because Thread.sleep doesn't always sleep
- // enough:
- while(true) {
- final long pauseNS = targetNS - curNS;
- if (pauseNS > 0) {
- try {
- Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
+ // While loop because Thread.sleep doesn't always sleep
+ // enough:
+ while(true) {
+ final long pauseNS = targetNS - curNS;
+ if (pauseNS > 0) {
+ try {
+ Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ curNS = System.nanoTime();
+ continue;
}
- curNS = System.nanoTime();
- continue;
+ break;
}
- break;
+ return targetNS;
}
}
}
Modified: lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java (original)
+++ lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java Thu Nov 8 21:29:43 2012
@@ -96,8 +96,6 @@ public class MockDirectoryWrapper extend
// is made to delete an open file, we enroll it here.
private Set<String> openFilesDeleted;
- final RateLimiter rateLimiter;
-
private synchronized void init() {
if (openFiles == null) {
openFiles = new HashMap<String,Integer>();
@@ -120,19 +118,6 @@ public class MockDirectoryWrapper extend
.mBitsToBytes(40 + randomState.nextInt(10)), 5 + randomState.nextInt(5), null);
// force wrapping of lockfactory
this.lockFactory = new MockLockFactoryWrapper(this, delegate.getLockFactory());
-
- // 2% of the time use rate limiter
- if (randomState.nextInt(50) == 17) {
- // Use RateLimiter
- double maxMBPerSec = 10 + 5*(randomState.nextDouble()-0.5);
- if (LuceneTestCase.VERBOSE) {
- System.out.println("MockDirectoryWrapper: will rate limit output IO to " + maxMBPerSec + " MB/sec");
- }
- rateLimiter = new RateLimiter(maxMBPerSec);
- } else {
- rateLimiter = null;
- }
-
init();
}
@@ -447,7 +432,6 @@ public class MockDirectoryWrapper extend
ramdir.fileMap.put(name, file);
}
}
-
//System.out.println(Thread.currentThread().getName() + ": MDW: create " + name);
IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name);
addFileHandle(io, name, Handle.Output);
@@ -455,7 +439,7 @@ public class MockDirectoryWrapper extend
// throttling REALLY slows down tests, so don't do it very often for SOMETIMES.
if (throttling == Throttling.ALWAYS ||
- (throttling == Throttling.SOMETIMES && rateLimiter == null && randomState.nextInt(50) == 0)) {
+ (throttling == Throttling.SOMETIMES && randomState.nextInt(50) == 0) && !(delegate instanceof RateLimitedDirectoryWrapper)) {
if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: throttling indexOutput");
}
Modified: lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java (original)
+++ lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java Thu Nov 8 21:29:43 2012
@@ -77,11 +77,6 @@ public class MockIndexOutputWrapper exte
public void writeBytes(byte[] b, int offset, int len) throws IOException {
long freeSpace = dir.maxSize == 0 ? 0 : dir.maxSize - dir.sizeInBytes();
long realUsage = 0;
-
- if (dir.rateLimiter != null && len >= 1000) {
- dir.rateLimiter.pause(len);
- }
-
// If MockRAMDir crashed since we were opened, then
// don't write anything:
if (dir.crashed)
Modified: lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (original)
+++ lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java Thu Nov 8 21:29:43 2012
@@ -38,6 +38,7 @@ import org.apache.lucene.search.*;
import org.apache.lucene.search.FieldCache.CacheEntry;
import org.apache.lucene.search.QueryUtils.FCInvisibleMultiReader;
import org.apache.lucene.store.*;
+import org.apache.lucene.store.IOContext.Context;
import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
import org.apache.lucene.util.FieldCacheSanityChecker.Insanity;
import org.junit.*;
@@ -948,6 +949,27 @@ public abstract class LuceneTestCase ext
if (rarely(random)) {
directory = new NRTCachingDirectory(directory, random.nextDouble(), random.nextDouble());
}
+
+ if (rarely(random)) {
+ final double maxMBPerSec = 10 + 5*(random.nextDouble()-0.5);
+ if (LuceneTestCase.VERBOSE) {
+ System.out.println("LuceneTestCase: will rate limit output IndexOutput to " + maxMBPerSec + " MB/sec");
+ }
+ final RateLimitedDirectoryWrapper rateLimitedDirectoryWrapper = new RateLimitedDirectoryWrapper(directory);
+ switch (random.nextInt(10)) {
+ case 3: // sometimes rate limit on flush
+ rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH);
+ break;
+ case 2: // sometimes rate limit flush & merge
+ rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH);
+ rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE);
+ break;
+ default:
+ rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE);
+ }
+ directory = rateLimitedDirectoryWrapper;
+
+ }
if (bare) {
BaseDirectoryWrapper base = new BaseDirectoryWrapper(directory);
@@ -955,6 +977,7 @@ public abstract class LuceneTestCase ext
return base;
} else {
MockDirectoryWrapper mock = new MockDirectoryWrapper(random, directory);
+
mock.setThrottling(TEST_THROTTLING);
closeAfterSuite(new CloseableDirectory(mock, suiteFailureMarker));
return mock;