You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/11/08 22:29:44 UTC

svn commit: r1407271 - in /lucene/dev/branches/branch_4x: ./ lucene/ lucene/core/ lucene/core/src/java/org/apache/lucene/store/ lucene/test-framework/ lucene/test-framework/src/java/org/apache/lucene/store/ lucene/test-framework/src/java/org/apache/luc...

Author: simonw
Date: Thu Nov  8 21:29:43 2012
New Revision: 1407271

URL: http://svn.apache.org/viewvc?rev=1407271&view=rev
Log:
LUCENE-4537: Separate RateLimiter from Directory

Added:
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java
      - copied unchanged from r1407268, lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
      - copied unchanged from r1407268, lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/lucene/   (props changed)
    lucene/dev/branches/branch_4x/lucene/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/lucene/core/   (props changed)
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
    lucene/dev/branches/branch_4x/lucene/test-framework/   (props changed)
    lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
    lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
    lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java

Modified: lucene/dev/branches/branch_4x/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/CHANGES.txt?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/lucene/CHANGES.txt Thu Nov  8 21:29:43 2012
@@ -82,7 +82,11 @@ API Changes
 
 * LUCENE-4520: ValueSource.getSortField no longer throws IOExceptions
   (Alan Woodward)
-
+  
+* LUCENE-4537: RateLimiter is now separated from FSDirectory and exposed via
+  RateLimitingDirectoryWrapper. Any Directory can now be rate-limited.
+  (Simon Willnauer)  
+  
 Bug Fixes
 
 * LUCENE-1822: BaseFragListBuilder hard-coded 6 char margin is too naive.

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java (original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java Thu Nov  8 21:29:43 2012
@@ -123,9 +123,6 @@ public abstract class FSDirectory extend
   protected final Set<String> staleFiles = synchronizedSet(new HashSet<String>()); // Files written, but not yet sync'ed
   private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566
 
-  // null means no limit
-  private volatile RateLimiter mergeWriteRateLimiter;
-
   // returns the canonical version of the directory, creating it if it doesn't exist.
   private static File getCanonicalPath(File file) throws IOException {
     return new File(file.getCanonicalPath());
@@ -286,51 +283,7 @@ public abstract class FSDirectory extend
     ensureOpen();
 
     ensureCanWrite(name);
-    return new FSIndexOutput(this, name, context.context == IOContext.Context.MERGE ? mergeWriteRateLimiter : null);
-  }
-
-  /** Sets the maximum (approx) MB/sec allowed by all write
-   *  IO performed by merging.  Pass null to have no limit.
-   *
-   *  <p><b>NOTE</b>: if merges are already running there is
-   *  no guarantee this new rate will apply to them; it will
-   *  only apply for certain to new merges.
-   *
-   * @lucene.experimental */
-  public void setMaxMergeWriteMBPerSec(Double mbPerSec) {
-    RateLimiter limiter = mergeWriteRateLimiter;
-    if (mbPerSec == null) {
-      if (limiter != null) {
-        limiter.setMbPerSec(Double.MAX_VALUE);
-        mergeWriteRateLimiter = null;
-      }
-    } else if (limiter != null) {
-      limiter.setMbPerSec(mbPerSec);
-    } else {
-      mergeWriteRateLimiter = new RateLimiter(mbPerSec);
-    }
-  }
-
-  /**
-   * Sets the rate limiter to be used to limit (approx) MB/sec allowed
-   * by all IO performed when merging. Pass null to have no limit.
-   *
-   * <p>Passing an instance of rate limiter compared to setting it using
-   * {@link #setMaxMergeWriteMBPerSec(Double)} allows to use the same limiter
-   * instance across several directories globally limiting IO when merging
-   * across them.
-   *
-   * @lucene.experimental */
-  public void setMaxMergeWriteLimiter(RateLimiter mergeWriteRateLimiter) {
-    this.mergeWriteRateLimiter = mergeWriteRateLimiter;
-  }
-
-  /** See {@link #setMaxMergeWriteMBPerSec}.
-   *
-   * @lucene.experimental */
-  public Double getMaxMergeWriteMBPerSec() {
-    RateLimiter limiter = mergeWriteRateLimiter;
-    return limiter == null ? null : limiter.getMbPerSec();
+    return new FSIndexOutput(this, name);
   }
 
   protected void ensureCanWrite(String name) throws IOException {
@@ -504,23 +457,18 @@ public abstract class FSDirectory extend
     private final String name;
     private final RandomAccessFile file;
     private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once
-    private final RateLimiter rateLimiter;
     
-    public FSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter) throws IOException {
+    public FSIndexOutput(FSDirectory parent, String name) throws IOException {
       this.parent = parent;
       this.name = name;
       file = new RandomAccessFile(new File(parent.directory, name), "rw");
       isOpen = true;
-      this.rateLimiter = rateLimiter;
     }
 
     /** output methods: */
     @Override
     public void flushBuffer(byte[] b, int offset, int size) throws IOException {
       assert isOpen;
-      if (rateLimiter != null) {
-        rateLimiter.pause(size);
-      }
       file.write(b, offset, size);
     }
     

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java (original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java Thu Nov  8 21:29:43 2012
@@ -19,75 +19,102 @@ package org.apache.lucene.store;
 
 import org.apache.lucene.util.ThreadInterruptedException;
 
-/** Simple class to rate limit IO.  Typically it's shared
- *  across multiple IndexInputs or IndexOutputs (for example
+/** Abstract base class to rate limit IO.  Typically implementations are
+ *  shared across multiple IndexInputs or IndexOutputs (for example
  *  those involved all merging).  Those IndexInputs and
  *  IndexOutputs would call {@link #pause} whenever they
  *  want to read bytes or write bytes. */
-
-public class RateLimiter {
-  private volatile double mbPerSec;
-  private volatile double nsPerByte;
-  private volatile long lastNS;
-
-  // TODO: we could also allow eg a sub class to dynamically
-  // determine the allowed rate, eg if an app wants to
-  // change the allowed rate over time or something
-
-  /** mbPerSec is the MB/sec max IO rate */
-  public RateLimiter(double mbPerSec) {
-    setMbPerSec(mbPerSec);
-  }
+public abstract class RateLimiter {
 
   /**
    * Sets an updated mb per second rate limit.
    */
-  public void setMbPerSec(double mbPerSec) {
-    this.mbPerSec = mbPerSec;
-    nsPerByte = 1000000000. / (1024*1024*mbPerSec);
-  }
-
+  public abstract void setMbPerSec(double mbPerSec);
   /**
    * The current mb per second rate limit.
    */
-  public double getMbPerSec() {
-    return this.mbPerSec;
-  }
-
+  public abstract double getMbPerSec();
+  
   /** Pauses, if necessary, to keep the instantaneous IO
-   *  rate at or below the target. NOTE: multiple threads
-   *  may safely use this, however the implementation is
-   *  not perfectly thread safe but likely in practice this
-   *  is harmless (just means in some rare cases the rate
-   *  might exceed the target).  It's best to call this
-   *  with a biggish count, not one byte at a time. */
-  public void pause(long bytes) {
-    if (bytes == 1) {
-      return;
+   *  rate at or below the target. 
+   *  <p>
+   *  Note: the implementation is thread-safe
+   *  </p>
+   *  @return the pause time in nano seconds 
+   * */
+  public abstract long pause(long bytes);
+  
+  /**
+   * Simple class to rate limit IO.
+   */
+  public static class SimpleRateLimiter extends RateLimiter {
+    private volatile double mbPerSec;
+    private volatile double nsPerByte;
+    private volatile long lastNS;
+
+    // TODO: we could also allow eg a sub class to dynamically
+    // determine the allowed rate, eg if an app wants to
+    // change the allowed rate over time or something
+
+    /** mbPerSec is the MB/sec max IO rate */
+    public SimpleRateLimiter(double mbPerSec) {
+      setMbPerSec(mbPerSec);
+    }
+
+    /**
+     * Sets an updated mb per second rate limit.
+     */
+    public void setMbPerSec(double mbPerSec) {
+      this.mbPerSec = mbPerSec;
+      nsPerByte = 1000000000. / (1024*1024*mbPerSec);
+      
     }
 
-    // TODO: this is purely instantaneous rate; maybe we
-    // should also offer decayed recent history one?
-    final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
-    long curNS = System.nanoTime();
-    if (lastNS < curNS) {
-      lastNS = curNS;
+    /**
+     * The current mb per second rate limit.
+     */
+    public double getMbPerSec() {
+      return this.mbPerSec;
     }
+    
+    /** Pauses, if necessary, to keep the instantaneous IO
+     *  rate at or below the target. NOTE: multiple threads
+     *  may safely use this, however the implementation is
+     *  not perfectly thread safe but likely in practice this
+     *  is harmless (just means in some rare cases the rate
+     *  might exceed the target).  It's best to call this
+     *  with a biggish count, not one byte at a time.
+     *  @return the pause time in nano seconds 
+     * */
+    public long pause(long bytes) {
+      if (bytes == 1) {
+        return 0;
+      }
+
+      // TODO: this is purely instantaneous rate; maybe we
+      // should also offer decayed recent history one?
+      final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
+      long curNS = System.nanoTime();
+      if (lastNS < curNS) {
+        lastNS = curNS;
+      }
 
-    // While loop because Thread.sleep doesn't always sleep
-    // enough:
-    while(true) {
-      final long pauseNS = targetNS - curNS;
-      if (pauseNS > 0) {
-        try {
-          Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
-        } catch (InterruptedException ie) {
-          throw new ThreadInterruptedException(ie);
+      // While loop because Thread.sleep doesn't always sleep
+      // enough:
+      while(true) {
+        final long pauseNS = targetNS - curNS;
+        if (pauseNS > 0) {
+          try {
+            Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
+          } catch (InterruptedException ie) {
+            throw new ThreadInterruptedException(ie);
+          }
+          curNS = System.nanoTime();
+          continue;
         }
-        curNS = System.nanoTime();
-        continue;
+        break;
       }
-      break;
+      return targetNS;
     }
   }
 }

Modified: lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java (original)
+++ lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java Thu Nov  8 21:29:43 2012
@@ -96,8 +96,6 @@ public class MockDirectoryWrapper extend
   // is made to delete an open file, we enroll it here.
   private Set<String> openFilesDeleted;
 
-  final RateLimiter rateLimiter;
-
   private synchronized void init() {
     if (openFiles == null) {
       openFiles = new HashMap<String,Integer>();
@@ -120,19 +118,6 @@ public class MockDirectoryWrapper extend
         .mBitsToBytes(40 + randomState.nextInt(10)), 5 + randomState.nextInt(5), null);
     // force wrapping of lockfactory
     this.lockFactory = new MockLockFactoryWrapper(this, delegate.getLockFactory());
-
-    // 2% of the time use rate limiter
-    if (randomState.nextInt(50) == 17) {
-      // Use RateLimiter
-      double maxMBPerSec = 10 + 5*(randomState.nextDouble()-0.5);
-      if (LuceneTestCase.VERBOSE) {
-        System.out.println("MockDirectoryWrapper: will rate limit output IO to " + maxMBPerSec + " MB/sec");
-      }
-      rateLimiter = new RateLimiter(maxMBPerSec);
-    } else {
-      rateLimiter = null;
-    }
-
     init();
   }
 
@@ -447,7 +432,6 @@ public class MockDirectoryWrapper extend
         ramdir.fileMap.put(name, file);
       }
     }
-    
     //System.out.println(Thread.currentThread().getName() + ": MDW: create " + name);
     IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name);
     addFileHandle(io, name, Handle.Output);
@@ -455,7 +439,7 @@ public class MockDirectoryWrapper extend
     
     // throttling REALLY slows down tests, so don't do it very often for SOMETIMES.
     if (throttling == Throttling.ALWAYS || 
-        (throttling == Throttling.SOMETIMES && rateLimiter == null && randomState.nextInt(50) == 0)) {
+        (throttling == Throttling.SOMETIMES && randomState.nextInt(50) == 0) && !(delegate instanceof RateLimitedDirectoryWrapper)) {
       if (LuceneTestCase.VERBOSE) {
         System.out.println("MockDirectoryWrapper: throttling indexOutput");
       }

Modified: lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java (original)
+++ lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java Thu Nov  8 21:29:43 2012
@@ -77,11 +77,6 @@ public class MockIndexOutputWrapper exte
   public void writeBytes(byte[] b, int offset, int len) throws IOException {
     long freeSpace = dir.maxSize == 0 ? 0 : dir.maxSize - dir.sizeInBytes();
     long realUsage = 0;
-
-    if (dir.rateLimiter != null && len >= 1000) {
-      dir.rateLimiter.pause(len);
-    }
-
     // If MockRAMDir crashed since we were opened, then
     // don't write anything:
     if (dir.crashed)

Modified: lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java?rev=1407271&r1=1407270&r2=1407271&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (original)
+++ lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java Thu Nov  8 21:29:43 2012
@@ -38,6 +38,7 @@ import org.apache.lucene.search.*;
 import org.apache.lucene.search.FieldCache.CacheEntry;
 import org.apache.lucene.search.QueryUtils.FCInvisibleMultiReader;
 import org.apache.lucene.store.*;
+import org.apache.lucene.store.IOContext.Context;
 import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
 import org.apache.lucene.util.FieldCacheSanityChecker.Insanity;
 import org.junit.*;
@@ -948,6 +949,27 @@ public abstract class LuceneTestCase ext
     if (rarely(random)) {
       directory = new NRTCachingDirectory(directory, random.nextDouble(), random.nextDouble());
     }
+    
+    if (rarely(random)) { 
+      final double maxMBPerSec = 10 + 5*(random.nextDouble()-0.5);
+      if (LuceneTestCase.VERBOSE) {
+        System.out.println("LuceneTestCase: will rate limit output IndexOutput to " + maxMBPerSec + " MB/sec");
+      }
+      final RateLimitedDirectoryWrapper rateLimitedDirectoryWrapper = new RateLimitedDirectoryWrapper(directory);
+      switch (random.nextInt(10)) {
+        case 3: // sometimes rate limit on flush
+          rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH);
+          break;
+        case 2: // sometimes rate limit flush & merge
+          rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH);
+          rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE);
+          break;
+        default:
+          rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE);
+      }
+      directory =  rateLimitedDirectoryWrapper;
+      
+    }
 
     if (bare) {
       BaseDirectoryWrapper base = new BaseDirectoryWrapper(directory);
@@ -955,6 +977,7 @@ public abstract class LuceneTestCase ext
       return base;
     } else {
       MockDirectoryWrapper mock = new MockDirectoryWrapper(random, directory);
+      
       mock.setThrottling(TEST_THROTTLING);
       closeAfterSuite(new CloseableDirectory(mock, suiteFailureMarker));
       return mock;