You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/11/08 22:24:44 UTC

svn commit: r1407268 - in /lucene/dev/trunk/lucene: ./ core/src/java/org/apache/lucene/store/ test-framework/src/java/org/apache/lucene/store/ test-framework/src/java/org/apache/lucene/util/

Author: simonw
Date: Thu Nov  8 21:24:43 2012
New Revision: 1407268

URL: http://svn.apache.org/viewvc?rev=1407268&view=rev
Log:
LUCENE-4537: Separate RateLimiter from Directory

Added:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java   (with props)
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java   (with props)
Modified:
    lucene/dev/trunk/lucene/CHANGES.txt
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
    lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
    lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
    lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java

Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1407268&r1=1407267&r2=1407268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Thu Nov  8 21:24:43 2012
@@ -95,7 +95,11 @@ API Changes
 
 * LUCENE-4520: ValueSource.getSortField no longer throws IOExceptions
   (Alan Woodward)
-
+  
+* LUCENE-4537: RateLimiter is now separated from FSDirectory and exposed via
+  RateLimitingDirectoryWrapper. Any Directory can now be rate-limited.
+  (Simon Willnauer)  
+  
 Bug Fixes
 
 * LUCENE-1822: BaseFragListBuilder hard-coded 6 char margin is too naive.

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java?rev=1407268&r1=1407267&r2=1407268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java Thu Nov  8 21:24:43 2012
@@ -123,9 +123,6 @@ public abstract class FSDirectory extend
   protected final Set<String> staleFiles = synchronizedSet(new HashSet<String>()); // Files written, but not yet sync'ed
   private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566
 
-  // null means no limit
-  private volatile RateLimiter mergeWriteRateLimiter;
-
   // returns the canonical version of the directory, creating it if it doesn't exist.
   private static File getCanonicalPath(File file) throws IOException {
     return new File(file.getCanonicalPath());
@@ -286,51 +283,7 @@ public abstract class FSDirectory extend
     ensureOpen();
 
     ensureCanWrite(name);
-    return new FSIndexOutput(this, name, context.context == IOContext.Context.MERGE ? mergeWriteRateLimiter : null);
-  }
-
-  /** Sets the maximum (approx) MB/sec allowed by all write
-   *  IO performed by merging.  Pass null to have no limit.
-   *
-   *  <p><b>NOTE</b>: if merges are already running there is
-   *  no guarantee this new rate will apply to them; it will
-   *  only apply for certain to new merges.
-   *
-   * @lucene.experimental */
-  public void setMaxMergeWriteMBPerSec(Double mbPerSec) {
-    RateLimiter limiter = mergeWriteRateLimiter;
-    if (mbPerSec == null) {
-      if (limiter != null) {
-        limiter.setMbPerSec(Double.MAX_VALUE);
-        mergeWriteRateLimiter = null;
-      }
-    } else if (limiter != null) {
-      limiter.setMbPerSec(mbPerSec);
-    } else {
-      mergeWriteRateLimiter = new RateLimiter(mbPerSec);
-    }
-  }
-
-  /**
-   * Sets the rate limiter to be used to limit (approx) MB/sec allowed
-   * by all IO performed when merging. Pass null to have no limit.
-   *
-   * <p>Passing an instance of rate limiter compared to setting it using
-   * {@link #setMaxMergeWriteMBPerSec(Double)} allows to use the same limiter
-   * instance across several directories globally limiting IO when merging
-   * across them.
-   *
-   * @lucene.experimental */
-  public void setMaxMergeWriteLimiter(RateLimiter mergeWriteRateLimiter) {
-    this.mergeWriteRateLimiter = mergeWriteRateLimiter;
-  }
-
-  /** See {@link #setMaxMergeWriteMBPerSec}.
-   *
-   * @lucene.experimental */
-  public Double getMaxMergeWriteMBPerSec() {
-    RateLimiter limiter = mergeWriteRateLimiter;
-    return limiter == null ? null : limiter.getMbPerSec();
+    return new FSIndexOutput(this, name);
   }
 
   protected void ensureCanWrite(String name) throws IOException {
@@ -504,23 +457,18 @@ public abstract class FSDirectory extend
     private final String name;
     private final RandomAccessFile file;
     private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once
-    private final RateLimiter rateLimiter;
     
-    public FSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter) throws IOException {
+    public FSIndexOutput(FSDirectory parent, String name) throws IOException {
       this.parent = parent;
       this.name = name;
       file = new RandomAccessFile(new File(parent.directory, name), "rw");
       isOpen = true;
-      this.rateLimiter = rateLimiter;
     }
 
     /** output methods: */
     @Override
     public void flushBuffer(byte[] b, int offset, int size) throws IOException {
       assert isOpen;
-      if (rateLimiter != null) {
-        rateLimiter.pause(size);
-      }
       file.write(b, offset, size);
     }
     

Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java?rev=1407268&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java (added)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java Thu Nov  8 21:24:43 2012
@@ -0,0 +1,223 @@
+package org.apache.lucene.store;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.lucene.store.IOContext.Context;
+
+/**
+ * 
+ * A {@link Directory} wrapper that allows {@link IndexOutput} rate limiting using
+ * {@link Context IO context} specific {@link RateLimiter rate limiters}.
+ * 
+ *  @see #setRateLimiter(RateLimiter, Context)
+ * @lucene.experimental
+ */
+public final class RateLimitedDirectoryWrapper extends Directory {
+  
+  private final Directory delegate;
+  // we need to be volatile here to make sure we see all the values that are set
+  // / modified concurrently
+  private volatile RateLimiter[] contextRateLimiters = new RateLimiter[IOContext.Context
+      .values().length];
+  
+  public RateLimitedDirectoryWrapper(Directory wrapped) {
+    this.delegate = wrapped;
+  }
+  
+  public String[] listAll() throws IOException {
+    ensureOpen();
+    return delegate.listAll();
+  }
+  
+  public boolean fileExists(String name) throws IOException {
+    ensureOpen();
+    return delegate.fileExists(name);
+  }
+  
+  public void deleteFile(String name) throws IOException {
+    ensureOpen();
+    delegate.deleteFile(name);
+  }
+  
+  public long fileLength(String name) throws IOException {
+    ensureOpen();
+    return delegate.fileLength(name);
+  }
+  
+  public IndexOutput createOutput(String name, IOContext context)
+      throws IOException {
+    ensureOpen();
+    final IndexOutput output = delegate.createOutput(name, context);
+    final RateLimiter limiter = getRateLimiter(context.context);
+    if (limiter != null) {
+      return new RateLimitedIndexOutput(limiter, output);
+    }
+    return output;
+  }
+  
+  public void sync(Collection<String> names) throws IOException {
+    ensureOpen();
+    delegate.sync(names);
+  }
+  
+  public IndexInput openInput(String name, IOContext context)
+      throws IOException {
+    ensureOpen();
+    return delegate.openInput(name, context);
+  }
+  
+  public void close() throws IOException {
+    isOpen = false;
+    delegate.close();
+  }
+  
+  public IndexInputSlicer createSlicer(String name, IOContext context)
+      throws IOException {
+    ensureOpen();
+    return delegate.createSlicer(name, context);
+  }
+  
+  @Override
+  public Lock makeLock(String name) {
+    ensureOpen();
+    return delegate.makeLock(name);
+  }
+
+  @Override
+  public void clearLock(String name) throws IOException {
+    ensureOpen();
+    delegate.clearLock(name);
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    ensureOpen();
+    delegate.setLockFactory(lockFactory);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    ensureOpen();
+    return delegate.getLockFactory();
+  }
+
+  @Override
+  public String getLockID() {
+    ensureOpen();
+    return delegate.getLockID();
+  }
+
+  @Override
+  public String toString() {
+    return "RateLimitedDirectoryWrapper(" + delegate.toString() + ")";
+  }
+
+  @Override
+  public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
+    ensureOpen();
+    delegate.copy(to, src, dest, context);
+  }
+  
+  private RateLimiter getRateLimiter(IOContext.Context context) {
+    assert context != null;
+    return contextRateLimiters[context.ordinal()];
+  }
+  
+  /**
+   * Sets the maximum (approx) MB/sec allowed by all write IO performed by
+   * {@link IndexOutput} created with the given {@link IOContext.Context}. Pass
+   * <code>null</code> to have no limit.
+   * 
+   * <p>
+   * <b>NOTE</b>: For already created {@link IndexOutput} instances there is no
+   * guarantee this new rate will apply to them; it will only be guaranteed to
+   * apply for new created {@link IndexOutput} instances.
+   * <p>
+   * <b>NOTE</b>: this is an optional operation and might not be respected by
+   * all Directory implementations. Currently only {@link FSDirectory buffered}
+   * Directory implementations use rate-limiting.
+   * 
+   * @throws IllegalArgumentException
+   *           if context is <code>null</code>
+   * @throws AlreadyClosedException if the {@link Directory} is already closed
+   * @lucene.experimental
+   */
+  public void setMaxWriteMBPerSec(Double mbPerSec, IOContext.Context context) {
+    ensureOpen();
+    if (context == null) {
+      throw new IllegalArgumentException("Context must not be null");
+    }
+    final int ord = context.ordinal();
+    final RateLimiter limiter = contextRateLimiters[ord];
+    if (mbPerSec == null) {
+      if (limiter != null) {
+        limiter.setMbPerSec(Double.MAX_VALUE);
+        contextRateLimiters[ord] = null;
+      }
+    } else if (limiter != null) {
+      limiter.setMbPerSec(mbPerSec);
+      contextRateLimiters[ord] = limiter; // cross the mem barrier again
+    } else {
+      contextRateLimiters[ord] = new RateLimiter.SimpleRateLimiter(mbPerSec);
+    }
+  }
+  
+  /**
+   * Sets the rate limiter to be used to limit (approx) MB/sec allowed by all IO
+   * performed with the given {@link Context context}. Pass <code>null</code> to
+   * have no limit.
+   * 
+   * <p>
+   * Passing an instance of rate limiter compared to setting it using
+   * {@link #setMaxWriteMBPerSec(Double, org.apache.lucene.store.IOContext.Context)}
+   * allows to use the same limiter instance across several directories globally
+   * limiting IO across them.
+   * 
+   * @throws IllegalArgumentException
+   *           if context is <code>null</code>
+   * @throws AlreadyClosedException if the {@link Directory} is already closed           
+   * @lucene.experimental
+   */
+  public void setRateLimiter(RateLimiter mergeWriteRateLimiter,
+      Context context) {
+    ensureOpen();
+    if (context == null) {
+      throw new IllegalArgumentException("Context must not be null");
+    }
+    contextRateLimiters[context.ordinal()] = mergeWriteRateLimiter;
+  }
+  
+  /**
+   * See {@link #setMaxWriteMBPerSec}.
+   * 
+   * @throws IllegalArgumentException
+   *           if context is <code>null</code>
+   * @throws AlreadyClosedException if the {@link Directory} is already closed
+   * @lucene.experimental
+   */
+  public Double getMaxWriteMBPerSec(IOContext.Context context) {
+    ensureOpen();
+    if (context == null) {
+      throw new IllegalArgumentException("Context must not be null");
+    }
+    RateLimiter limiter = getRateLimiter(context);
+    return limiter == null ? null : limiter.getMbPerSec();
+  }
+  
+}

Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java?rev=1407268&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java (added)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java Thu Nov  8 21:24:43 2012
@@ -0,0 +1,76 @@
+package org.apache.lucene.store;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+/**
+ * A {@link RateLimiter rate limiting} {@link IndexOutput}
+ * 
+ * @lucene.internal
+ */
+final class RateLimitedIndexOutput extends BufferedIndexOutput {
+  
+  private final IndexOutput delegate;
+  private final BufferedIndexOutput bufferedDelegate;
+  private final RateLimiter rateLimiter;
+
+  RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) {
+    // TODO should we make buffer size configurable
+    if (delegate instanceof BufferedIndexOutput) {
+      bufferedDelegate = (BufferedIndexOutput) delegate;
+      this.delegate = delegate;
+    } else {
+      this.delegate = delegate;
+      bufferedDelegate = null;
+    }
+    this.rateLimiter = rateLimiter;
+  }
+  
+  @Override
+  protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
+    rateLimiter.pause(len);
+    if (bufferedDelegate != null) {
+      bufferedDelegate.flushBuffer(b, offset, len);
+    } else {
+      delegate.writeBytes(b, offset, len);
+    }
+    
+  }
+  
+  @Override
+  public long length() throws IOException {
+    return delegate.length();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    try {
+      super.flush();
+    } finally { 
+      delegate.flush();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      super.close();
+    } finally {
+      delegate.close();
+    }
+  }
+}

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java?rev=1407268&r1=1407267&r2=1407268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java Thu Nov  8 21:24:43 2012
@@ -19,75 +19,102 @@ package org.apache.lucene.store;
 
 import org.apache.lucene.util.ThreadInterruptedException;
 
-/** Simple class to rate limit IO.  Typically it's shared
- *  across multiple IndexInputs or IndexOutputs (for example
+/** Abstract base class to rate limit IO.  Typically implementations are
+ *  shared across multiple IndexInputs or IndexOutputs (for example
  *  those involved all merging).  Those IndexInputs and
  *  IndexOutputs would call {@link #pause} whenever they
  *  want to read bytes or write bytes. */
-
-public class RateLimiter {
-  private volatile double mbPerSec;
-  private volatile double nsPerByte;
-  private volatile long lastNS;
-
-  // TODO: we could also allow eg a sub class to dynamically
-  // determine the allowed rate, eg if an app wants to
-  // change the allowed rate over time or something
-
-  /** mbPerSec is the MB/sec max IO rate */
-  public RateLimiter(double mbPerSec) {
-    setMbPerSec(mbPerSec);
-  }
+public abstract class RateLimiter {
 
   /**
    * Sets an updated mb per second rate limit.
    */
-  public void setMbPerSec(double mbPerSec) {
-    this.mbPerSec = mbPerSec;
-    nsPerByte = 1000000000. / (1024*1024*mbPerSec);
-  }
-
+  public abstract void setMbPerSec(double mbPerSec);
   /**
    * The current mb per second rate limit.
    */
-  public double getMbPerSec() {
-    return this.mbPerSec;
-  }
-
+  public abstract double getMbPerSec();
+  
   /** Pauses, if necessary, to keep the instantaneous IO
-   *  rate at or below the target. NOTE: multiple threads
-   *  may safely use this, however the implementation is
-   *  not perfectly thread safe but likely in practice this
-   *  is harmless (just means in some rare cases the rate
-   *  might exceed the target).  It's best to call this
-   *  with a biggish count, not one byte at a time. */
-  public void pause(long bytes) {
-    if (bytes == 1) {
-      return;
+   *  rate at or below the target. 
+   *  <p>
+   *  Note: the implementation is thread-safe
+   *  </p>
+   *  @return the pause time in nano seconds 
+   * */
+  public abstract long pause(long bytes);
+  
+  /**
+   * Simple class to rate limit IO.
+   */
+  public static class SimpleRateLimiter extends RateLimiter {
+    private volatile double mbPerSec;
+    private volatile double nsPerByte;
+    private volatile long lastNS;
+
+    // TODO: we could also allow eg a sub class to dynamically
+    // determine the allowed rate, eg if an app wants to
+    // change the allowed rate over time or something
+
+    /** mbPerSec is the MB/sec max IO rate */
+    public SimpleRateLimiter(double mbPerSec) {
+      setMbPerSec(mbPerSec);
+    }
+
+    /**
+     * Sets an updated mb per second rate limit.
+     */
+    public void setMbPerSec(double mbPerSec) {
+      this.mbPerSec = mbPerSec;
+      nsPerByte = 1000000000. / (1024*1024*mbPerSec);
+      
     }
 
-    // TODO: this is purely instantaneous rate; maybe we
-    // should also offer decayed recent history one?
-    final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
-    long curNS = System.nanoTime();
-    if (lastNS < curNS) {
-      lastNS = curNS;
+    /**
+     * The current mb per second rate limit.
+     */
+    public double getMbPerSec() {
+      return this.mbPerSec;
     }
+    
+    /** Pauses, if necessary, to keep the instantaneous IO
+     *  rate at or below the target. NOTE: multiple threads
+     *  may safely use this, however the implementation is
+     *  not perfectly thread safe but likely in practice this
+     *  is harmless (just means in some rare cases the rate
+     *  might exceed the target).  It's best to call this
+     *  with a biggish count, not one byte at a time.
+     *  @return the pause time in nano seconds 
+     * */
+    public long pause(long bytes) {
+      if (bytes == 1) {
+        return 0;
+      }
+
+      // TODO: this is purely instantaneous rate; maybe we
+      // should also offer decayed recent history one?
+      final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
+      long curNS = System.nanoTime();
+      if (lastNS < curNS) {
+        lastNS = curNS;
+      }
 
-    // While loop because Thread.sleep doesn't always sleep
-    // enough:
-    while(true) {
-      final long pauseNS = targetNS - curNS;
-      if (pauseNS > 0) {
-        try {
-          Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
-        } catch (InterruptedException ie) {
-          throw new ThreadInterruptedException(ie);
+      // While loop because Thread.sleep doesn't always sleep
+      // enough:
+      while(true) {
+        final long pauseNS = targetNS - curNS;
+        if (pauseNS > 0) {
+          try {
+            Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
+          } catch (InterruptedException ie) {
+            throw new ThreadInterruptedException(ie);
+          }
+          curNS = System.nanoTime();
+          continue;
         }
-        curNS = System.nanoTime();
-        continue;
+        break;
       }
-      break;
+      return targetNS;
     }
   }
 }

Modified: lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java?rev=1407268&r1=1407267&r2=1407268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java (original)
+++ lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java Thu Nov  8 21:24:43 2012
@@ -96,8 +96,6 @@ public class MockDirectoryWrapper extend
   // is made to delete an open file, we enroll it here.
   private Set<String> openFilesDeleted;
 
-  final RateLimiter rateLimiter;
-
   private synchronized void init() {
     if (openFiles == null) {
       openFiles = new HashMap<String,Integer>();
@@ -120,19 +118,6 @@ public class MockDirectoryWrapper extend
         .mBitsToBytes(40 + randomState.nextInt(10)), 5 + randomState.nextInt(5), null);
     // force wrapping of lockfactory
     this.lockFactory = new MockLockFactoryWrapper(this, delegate.getLockFactory());
-
-    // 2% of the time use rate limiter
-    if (randomState.nextInt(50) == 17) {
-      // Use RateLimiter
-      double maxMBPerSec = 10 + 5*(randomState.nextDouble()-0.5);
-      if (LuceneTestCase.VERBOSE) {
-        System.out.println("MockDirectoryWrapper: will rate limit output IO to " + maxMBPerSec + " MB/sec");
-      }
-      rateLimiter = new RateLimiter(maxMBPerSec);
-    } else {
-      rateLimiter = null;
-    }
-
     init();
   }
 
@@ -447,7 +432,6 @@ public class MockDirectoryWrapper extend
         ramdir.fileMap.put(name, file);
       }
     }
-    
     //System.out.println(Thread.currentThread().getName() + ": MDW: create " + name);
     IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name);
     addFileHandle(io, name, Handle.Output);
@@ -455,7 +439,7 @@ public class MockDirectoryWrapper extend
     
     // throttling REALLY slows down tests, so don't do it very often for SOMETIMES.
     if (throttling == Throttling.ALWAYS || 
-        (throttling == Throttling.SOMETIMES && rateLimiter == null && randomState.nextInt(50) == 0)) {
+        (throttling == Throttling.SOMETIMES && randomState.nextInt(50) == 0) && !(delegate instanceof RateLimitedDirectoryWrapper)) {
       if (LuceneTestCase.VERBOSE) {
         System.out.println("MockDirectoryWrapper: throttling indexOutput");
       }

Modified: lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java?rev=1407268&r1=1407267&r2=1407268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java (original)
+++ lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java Thu Nov  8 21:24:43 2012
@@ -77,11 +77,6 @@ public class MockIndexOutputWrapper exte
   public void writeBytes(byte[] b, int offset, int len) throws IOException {
     long freeSpace = dir.maxSize == 0 ? 0 : dir.maxSize - dir.sizeInBytes();
     long realUsage = 0;
-
-    if (dir.rateLimiter != null && len >= 1000) {
-      dir.rateLimiter.pause(len);
-    }
-
     // If MockRAMDir crashed since we were opened, then
     // don't write anything:
     if (dir.crashed)

Modified: lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java?rev=1407268&r1=1407267&r2=1407268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (original)
+++ lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java Thu Nov  8 21:24:43 2012
@@ -37,6 +37,7 @@ import org.apache.lucene.search.*;
 import org.apache.lucene.search.FieldCache.CacheEntry;
 import org.apache.lucene.search.QueryUtils.FCInvisibleMultiReader;
 import org.apache.lucene.store.*;
+import org.apache.lucene.store.IOContext.Context;
 import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
 import org.apache.lucene.util.FieldCacheSanityChecker.Insanity;
 import org.junit.*;
@@ -947,6 +948,27 @@ public abstract class LuceneTestCase ext
     if (rarely(random)) {
       directory = new NRTCachingDirectory(directory, random.nextDouble(), random.nextDouble());
     }
+    
+    if (rarely(random)) { 
+      final double maxMBPerSec = 10 + 5*(random.nextDouble()-0.5);
+      if (LuceneTestCase.VERBOSE) {
+        System.out.println("LuceneTestCase: will rate limit output IndexOutput to " + maxMBPerSec + " MB/sec");
+      }
+      final RateLimitedDirectoryWrapper rateLimitedDirectoryWrapper = new RateLimitedDirectoryWrapper(directory);
+      switch (random.nextInt(10)) {
+        case 3: // sometimes rate limit on flush
+          rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH);
+          break;
+        case 2: // sometimes rate limit flush & merge
+          rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH);
+          rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE);
+          break;
+        default:
+          rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE);
+      }
+      directory =  rateLimitedDirectoryWrapper;
+      
+    }
 
     if (bare) {
       BaseDirectoryWrapper base = new BaseDirectoryWrapper(directory);
@@ -954,6 +976,7 @@ public abstract class LuceneTestCase ext
       return base;
     } else {
       MockDirectoryWrapper mock = new MockDirectoryWrapper(random, directory);
+      
       mock.setThrottling(TEST_THROTTLING);
       closeAfterSuite(new CloseableDirectory(mock, suiteFailureMarker));
       return mock;