You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2016/04/19 19:51:54 UTC

[1/3] accumulo git commit: ACCUMULO-4187: Added rate limiting for major compactions.

Repository: accumulo
Updated Branches:
  refs/heads/master 4f45908c0 -> c8e6fe749


http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java b/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
index 792e199..ce49f36 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
@@ -225,7 +225,7 @@ public class MockTableOperationsTest {
     fs.delete(tempFile, true);
     fs.mkdirs(failures);
     fs.mkdirs(tempFile.getParent());
-    FileSKVWriter writer = FileOperations.getInstance().openWriter(tempFile.toString(), fs, defaultConf, AccumuloConfiguration.getDefaultConfiguration());
+    FileSKVWriter writer = FileOperations.getInstance().openWriter(tempFile.toString(), fs, defaultConf, null, AccumuloConfiguration.getDefaultConfiguration());
     writer.startDefaultLocalityGroup();
     List<Pair<Key,Value>> keyVals = new ArrayList<Pair<Key,Value>>();
     for (int i = 0; i < 5; i++) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
index ca388e5..0fb4bd6 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
@@ -80,7 +80,7 @@ public class BloomFilterLayerLookupTest {
     // get output file name
     String suffix = FileOperations.getNewFileExtension(acuconf);
     String fname = new File(tempDir.getRoot(), testName + "." + suffix).getAbsolutePath();
-    FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, acuconf);
+    FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, null, acuconf);
 
     // write data to file
     long t1 = System.currentTimeMillis();
@@ -96,7 +96,7 @@ public class BloomFilterLayerLookupTest {
     bmfw.close();
 
     t1 = System.currentTimeMillis();
-    FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, acuconf);
+    FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, null, acuconf);
     t2 = System.currentTimeMillis();
     LOG.debug("Opened " + fname + " in " + (t2 - t1));
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java b/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
index 3fdeb8a..e8ccf35 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
@@ -51,7 +51,7 @@ public class FileOperationsTest {
       Configuration conf = new Configuration();
       FileSystem fs = FileSystem.getLocal(conf);
       AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
-      writer = fileOperations.openWriter(filename, fs, conf, acuconf);
+      writer = fileOperations.openWriter(filename, fs, conf, null, acuconf);
       writer.close();
     } catch (Exception ex) {
       caughtException = true;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
index 3eadc06..ad29a69 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
@@ -56,7 +56,7 @@ public class CreateCompatTestFile {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path(args[0]), "gz", conf, AccumuloConfiguration.getDefaultConfiguration());
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path(args[0]), "gz", null, conf, AccumuloConfiguration.getDefaultConfiguration());
     RFile.Writer writer = new RFile.Writer(_cbw, 1000);
 
     writer.startNewLocalityGroup("lg1", ncfs(nf("cf_", 1), nf("cf_", 2)));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 66978dd..391bea1 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
 import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,7 +55,7 @@ public class MultiLevelIndexTest extends TestCase {
     AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", CachedConfiguration.getInstance(), aconf);
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), "gz", CachedConfiguration.getInstance(), aconf);
 
     BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index a442347..5f66503 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -86,6 +86,7 @@ import com.google.common.hash.HashCode;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.primitives.Bytes;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
 
 public class RFileTest {
 
@@ -224,7 +225,7 @@ public class RFileTest {
     public void openWriter(boolean startDLG, int blockSize) throws IOException {
       baos = new ByteArrayOutputStream();
       dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
-      CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf, accumuloConfiguration);
+      CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), "gz", conf, accumuloConfiguration);
 
       SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration);
       Sampler sampler = null;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java b/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
new file mode 100644
index 0000000..9574d36
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+
+public class MockRateLimiter implements RateLimiter {
+  private final AtomicLong permitsAcquired = new AtomicLong();
+
+  @Override
+  public long getRate() {
+    return 0;
+  }
+
+  @Override
+  public void acquire(long permits) {
+    permitsAcquired.addAndGet(permits);
+  }
+
+  public long getPermitsAcquired() {
+    return permitsAcquired.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
new file mode 100644
index 0000000..6baff87
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+import org.apache.hadoop.fs.Seekable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RateLimitedInputStreamTest {
+
+  @Test
+  public void permitsAreProperlyAcquired() throws Exception {
+    Random randGen = new Random();
+    MockRateLimiter rateLimiter = new MockRateLimiter();
+    long bytesRetrieved = 0;
+    try (InputStream is = new RateLimitedInputStream(new RandomInputStream(), rateLimiter)) {
+      for (int i = 0; i < 100; ++i) {
+        int count = Math.abs(randGen.nextInt()) % 65536;
+        int countRead = is.read(new byte[count]);
+        Assert.assertEquals(count, countRead);
+        bytesRetrieved += count;
+      }
+    }
+    Assert.assertEquals(bytesRetrieved, rateLimiter.getPermitsAcquired());
+  }
+
+  private static class RandomInputStream extends InputStream implements Seekable {
+    private final Random r = new Random();
+
+    @Override
+    public int read() throws IOException {
+      return r.nextInt() & 0xff;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
new file mode 100644
index 0000000..9e12354
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.util.Random;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RateLimitedOutputStreamTest {
+
+  @Test
+  public void permitsAreProperlyAcquired() throws Exception {
+    Random randGen = new Random();
+    MockRateLimiter rateLimiter = new MockRateLimiter();
+    long bytesWritten = 0;
+    try (RateLimitedOutputStream os = new RateLimitedOutputStream(new NullOutputStream(), rateLimiter)) {
+      for (int i = 0; i < 100; ++i) {
+        byte[] bytes = new byte[Math.abs(randGen.nextInt() % 65536)];
+        os.write(bytes);
+        bytesWritten += bytes.length;
+      }
+      Assert.assertEquals(bytesWritten, os.position());
+    }
+    Assert.assertEquals(bytesWritten, rateLimiter.getPermitsAcquired());
+  }
+
+  public static class NullOutputStream extends FilterOutputStream implements PositionedOutput {
+    public NullOutputStream() {
+      super(new CountingOutputStream(ByteStreams.nullOutputStream()));
+    }
+
+    @Override
+    public long position() throws IOException {
+      return ((CountingOutputStream) out).getCount();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 2656d4b..cf55e35 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -642,7 +642,7 @@ public class BulkImporter {
     String filename = file.toString();
     // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
     FileSystem fs = vm.getVolumeByPath(file).getFileSystem();
-    FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), context.getConfiguration());
+    FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), null, context.getConfiguration());
     try {
       Text row = startRow;
       if (row == null)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 4378f2c..e58a79c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -465,7 +465,7 @@ public class Initialize implements KeywordExecutable {
       createEntriesForTablet(sorted, tablet);
     }
     FileSystem fs = volmanager.getVolumeByPath(new Path(fileName)).getFileSystem();
-    FileSKVWriter tabletWriter = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), AccumuloConfiguration.getDefaultConfiguration());
+    FileSKVWriter tabletWriter = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), null, AccumuloConfiguration.getDefaultConfiguration());
     tabletWriter.startDefaultLocalityGroup();
 
     for (Entry<Key,Value> entry : sorted.entrySet()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
index 04e17d5..9291808 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
@@ -132,7 +132,7 @@ public class FileUtil {
 
       outFiles.add(newMapFile);
       FileSystem ns = fs.getVolumeByPath(new Path(newMapFile)).getFileSystem();
-      FileSKVWriter writer = new RFileOperations().openWriter(newMapFile.toString(), ns, ns.getConf(), acuConf);
+      FileSKVWriter writer = new RFileOperations().openWriter(newMapFile.toString(), ns, ns.getConf(), null, acuConf);
       writer.startDefaultLocalityGroup();
       List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(inFiles.size());
 
@@ -404,7 +404,7 @@ public class FileUtil {
           reader = FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf);
         else
           reader = FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false,
-              ns, ns.getConf(), acuConf);
+              ns, ns.getConf(), null, acuConf);
 
         while (reader.hasTop()) {
           Key key = reader.getTopKey();
@@ -428,7 +428,7 @@ public class FileUtil {
         readers.add(FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf));
       else
         readers.add(FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false,
-            ns, ns.getConf(), acuConf));
+            ns, ns.getConf(), null, acuConf));
 
     }
     return numKeys;
@@ -445,7 +445,7 @@ public class FileUtil {
       FileSKVIterator reader = null;
       FileSystem ns = fs.getVolumeByPath(mapfile.path()).getFileSystem();
       try {
-        reader = FileOperations.getInstance().openReader(mapfile.toString(), false, ns, ns.getConf(), acuConf);
+        reader = FileOperations.getInstance().openReader(mapfile.toString(), false, ns, ns.getConf(), null, acuConf);
 
         Key firstKey = reader.getFirstKey();
         if (firstKey != null) {
@@ -479,7 +479,7 @@ public class FileUtil {
     for (FileRef ref : mapFiles) {
       Path path = ref.path();
       FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
-      FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), acuConf);
+      FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), null, acuConf);
 
       try {
         if (!reader.hasTop())

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
index 0116101..646e400 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -113,7 +113,7 @@ public class BulkImporterTest {
     EasyMock.replay(context);
     String file = "target/testFile.rf";
     fs.delete(new Path(file), true);
-    FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), context.getConfiguration());
+    FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), null, context.getConfiguration());
     writer.startDefaultLocalityGroup();
     Value empty = new Value(new byte[] {});
     writer.append(new Key("a", "cf", "cq"), empty);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index 2928418..c28d060 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -316,7 +316,7 @@ public class FileManager {
         Path path = new Path(file);
         FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
         // log.debug("Opening "+file + " path " + path);
-        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(),
+        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), null,
             context.getServerConfigurationFactory().getTableConfiguration(tablet), dataCache, indexCache);
         reservedFiles.add(reader);
         readersReserved.put(reader, file);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index ca4719d..5219e33 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -632,7 +632,7 @@ public class InMemoryMap {
         Configuration conf = CachedConfiguration.getInstance();
         FileSystem fs = FileSystem.getLocal(conf);
 
-        reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, SiteConfiguration.getInstance());
+        reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, null, SiteConfiguration.getInstance());
         if (iflag != null)
           reader.setInterruptFlag(iflag);
 
@@ -804,7 +804,7 @@ public class InMemoryMap {
           siteConf = createSampleConfig(siteConf);
         }
 
-        FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, siteConf);
+        FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, null, siteConf);
 
         InterruptibleIterator iter = map.skvIterator(null);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index f16f726..1523c55 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -256,8 +256,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
 
 public class TabletServer extends AccumuloServerContext implements Runnable {
+
   private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
   private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
   private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
@@ -330,7 +333,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     super(confFactory);
     this.confFactory = confFactory;
     this.fs = fs;
-    AccumuloConfiguration aconf = getConfiguration();
+    final AccumuloConfiguration aconf = getConfiguration();
     Instance instance = getInstance();
     log.info("Version " + Constants.VERSION);
     log.info("Instance " + instance.getInstanceID());
@@ -3089,4 +3092,28 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     bulkImportStatus.removeBulkImportStatus(files);
   }
 
+  private static final String MAJC_READ_LIMITER_KEY = "tserv_majc_read";
+  private static final String MAJC_WRITE_LIMITER_KEY = "tserv_majc_write";
+  private final SharedRateLimiterFactory.RateProvider rateProvider = new SharedRateLimiterFactory.RateProvider() {
+    @Override
+    public long getDesiredRate() {
+      return getConfiguration().getMemoryInBytes(Property.TSERV_MAJC_THROUGHPUT);
+    }
+  };
+
+  /**
+   * Get the {@link RateLimiter} for reads during major compactions on this tserver. All writes performed during major compactions are throttled to conform to
+   * this RateLimiter.
+   */
+  public final RateLimiter getMajorCompactionReadLimiter() {
+    return SharedRateLimiterFactory.getInstance().create(MAJC_READ_LIMITER_KEY, rateProvider);
+  }
+
+  /**
+   * Get the RateLimiter for writes during major compations on this tserver. All reads performed during major compactions are throttled to conform to this
+   * RateLimiter.
+   */
+  public final RateLimiter getMajorCompactionWriteLimiter() {
+    return SharedRateLimiterFactory.getInstance().create(MAJC_WRITE_LIMITER_KEY, rateProvider);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
index 26b1b12..79c0c4f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
@@ -77,7 +77,7 @@ public class MajorCompactionRequest implements Cloneable {
     // @TODO ensure these files are always closed?
     FileOperations fileFactory = FileOperations.getInstance();
     FileSystem ns = volumeManager.getVolumeByPath(ref.path()).getFileSystem();
-    FileSKVIterator openReader = fileFactory.openReader(ref.path().toString(), true, ns, ns.getConf(), tableConfig);
+    FileSKVIterator openReader = fileFactory.openReader(ref.path().toString(), true, ns, ns.getConf(), null, tableConfig);
     return openReader;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
index 2ef4a34..bde5be0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -81,6 +82,10 @@ public class Compactor implements Callable<CompactionStats> {
     boolean isCompactionEnabled();
 
     IteratorScope getIteratorScope();
+
+    RateLimiter getReadLimiter();
+
+    RateLimiter getWriteLimiter();
   }
 
   private final Map<FileRef,DataFileValue> filesToCompact;
@@ -192,7 +197,7 @@ public class Compactor implements Callable<CompactionStats> {
     try {
       FileOperations fileFactory = FileOperations.getInstance();
       FileSystem ns = this.fs.getVolumeByPath(outputFilePath).getFileSystem();
-      mfw = fileFactory.openWriter(outputFilePathName, ns, ns.getConf(), acuTableConf);
+      mfw = fileFactory.openWriter(outputFilePathName, ns, ns.getConf(), env.getWriteLimiter(), acuTableConf);
 
       Map<String,Set<ByteSequence>> lGroups;
       try {
@@ -280,7 +285,7 @@ public class Compactor implements Callable<CompactionStats> {
         FileSystem fs = this.fs.getVolumeByPath(mapFile.path()).getFileSystem();
         FileSKVIterator reader;
 
-        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, fs.getConf(), acuTableConf);
+        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, fs.getConf(), env.getReadLimiter(), acuTableConf);
 
         readers.add(reader);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index e3f8193..6bd2545 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.problems.ProblemReport;
@@ -69,6 +70,16 @@ public class MinorCompactor extends Compactor {
       public IteratorScope getIteratorScope() {
         return IteratorScope.minc;
       }
+
+      @Override
+      public RateLimiter getReadLimiter() {
+        return null;
+      }
+
+      @Override
+      public RateLimiter getWriteLimiter() {
+        return null;
+      }
     }, Collections.<IteratorSetting> emptyList(), mincReason.ordinal(), tableConfig);
     this.tabletServer = tabletServer;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index cf99dbd..fb47afc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -18,7 +18,6 @@ package org.apache.accumulo.tserver.tablet;
 
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.requireNonNull;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -153,6 +152,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import static java.util.Objects.requireNonNull;
 
 /**
  *
@@ -1583,7 +1584,7 @@ public class Tablet implements TabletCommitter {
     for (Entry<FileRef,DataFileValue> entry : allFiles.entrySet()) {
       FileRef file = entry.getKey();
       FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
-      FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), this.getTableConfiguration());
+      FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), null, this.getTableConfiguration());
       try {
         Key first = openReader.getFirstKey();
         Key last = openReader.getLastKey();
@@ -1807,8 +1808,8 @@ public class Tablet implements TabletCommitter {
         AccumuloConfiguration tableConf = createTableConfiguration(tableConfiguration, plan);
 
         Span span = Trace.start("compactFiles");
-        try {
 
+        try {
           CompactionEnv cenv = new CompactionEnv() {
             @Override
             public boolean isCompactionEnabled() {
@@ -1819,6 +1820,17 @@ public class Tablet implements TabletCommitter {
             public IteratorScope getIteratorScope() {
               return IteratorScope.majc;
             }
+
+            @Override
+            public RateLimiter getReadLimiter() {
+              return getTabletServer().getMajorCompactionReadLimiter();
+            }
+
+            @Override
+            public RateLimiter getWriteLimiter() {
+              return getTabletServer().getMajorCompactionWriteLimiter();
+            }
+
           };
 
           HashMap<FileRef,DataFileValue> copy = new HashMap<FileRef,DataFileValue>(getDatafileManager().getDatafileSizes());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
index d1cd5ce..d66863e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
@@ -149,7 +149,7 @@ public class TabletData {
       dataFiles.put(ref, dfv);
 
       FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
-      FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), conf);
+      FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), null, conf);
       long maxTime = -1;
       try {
         while (reader.hasTop()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java b/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
index 7ba253f..2f90d7e 100644
--- a/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
@@ -98,7 +98,7 @@ public class BulkImportMonitoringIT extends ConfigurableMacBase {
           fs.mkdirs(bulkFailures);
           fs.mkdirs(files);
           for (int i = 0; i < 10; i++) {
-            FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+            FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
                 AccumuloConfiguration.getDefaultConfiguration());
             writer.startDefaultLocalityGroup();
             for (int j = 0x100; j < 0xfff; j += 3) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java b/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java
new file mode 100644
index 0000000..6aa6930
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Random;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CompactionRateLimitingIT extends ConfigurableMacBase {
+  public static final long BYTES_TO_WRITE = 10 * 1024 * 1024;
+  public static final long RATE = 1 * 1024 * 1024;
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) {
+    cfg.setProperty(Property.TSERV_MAJC_THROUGHPUT, RATE + "B");
+    cfg.setProperty(Property.TABLE_MAJC_RATIO, "20");
+    cfg.setProperty(Property.TABLE_FILE_COMPRESSION_TYPE, "none");
+  }
+
+  @Test
+  public void majorCompactionsAreRateLimited() throws Exception {
+    long bytesWritten = 0;
+    String tableName = getUniqueNames(1)[0];
+    Connector conn = getCluster().getConnector("root", new PasswordToken(ROOT_PASSWORD));
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    try {
+      Random r = new Random();
+      while (bytesWritten < BYTES_TO_WRITE) {
+        byte[] rowKey = new byte[32];
+        r.nextBytes(rowKey);
+
+        byte[] qual = new byte[32];
+        r.nextBytes(qual);
+
+        byte[] value = new byte[1024];
+        r.nextBytes(value);
+
+        Mutation m = new Mutation(rowKey);
+        m.put(new byte[0], qual, value);
+        bw.addMutation(m);
+
+        bytesWritten += rowKey.length + qual.length + value.length;
+      }
+    } finally {
+      bw.close();
+    }
+
+    conn.tableOperations().flush(tableName, null, null, true);
+
+    long compactionStart = System.currentTimeMillis();
+    conn.tableOperations().compact(tableName, null, null, false, true);
+    long duration = System.currentTimeMillis() - compactionStart;
+    Assert.assertTrue(
+        String.format("Expected a compaction rate of no more than %,d bytes/sec, but saw a rate of %,f bytes/sec", RATE, 1000.0 * bytesWritten / duration),
+        duration > 1000L * BYTES_TO_WRITE / RATE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java b/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
index ada8504..3929dc7 100644
--- a/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
+++ b/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
@@ -69,7 +69,7 @@ public class CreateRandomRFile {
     FileSKVWriter mfw;
     try {
       FileSystem fs = FileSystem.get(conf);
-      mfw = new RFileOperations().openWriter(file, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
+      mfw = new RFileOperations().openWriter(file, fs, conf, null, AccumuloConfiguration.getDefaultConfiguration());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java b/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
index 64a1aaf..21b2b78 100644
--- a/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
+++ b/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
@@ -58,7 +58,7 @@ public class GenerateSequentialRFile implements Runnable {
       final Configuration conf = new Configuration();
       Path p = new Path(opts.filePath);
       final FileSystem fs = p.getFileSystem(conf);
-      FileSKVWriter writer = FileOperations.getInstance().openWriter(opts.filePath, fs, conf, DefaultConfiguration.getInstance());
+      FileSKVWriter writer = FileOperations.getInstance().openWriter(opts.filePath, fs, conf, null, DefaultConfiguration.getInstance());
 
       writer.startDefaultLocalityGroup();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java b/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
index 259a19e..b042d97 100644
--- a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
@@ -123,7 +123,7 @@ public class GetFileInfoBulkIT extends ConfigurableMacBase {
           fs.mkdirs(bulkFailures);
           fs.mkdirs(files);
           for (int i = 0; i < 100; i++) {
-            FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+            FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
                 AccumuloConfiguration.getDefaultConfiguration());
             writer.startDefaultLocalityGroup();
             for (int j = 0x100; j < 0xfff; j += 3) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
index 28112ac..0f0c292 100644
--- a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
@@ -1338,9 +1338,9 @@ public class ShellServerIT extends SharedMiniClusterBase {
     assertTrue(errorsDir.mkdir());
     fs.mkdirs(new Path(errorsDir.toString()));
     AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
-    FileSKVWriter evenWriter = FileOperations.getInstance().openWriter(even, fs, conf, aconf);
+    FileSKVWriter evenWriter = FileOperations.getInstance().openWriter(even, fs, conf, null, aconf);
     evenWriter.startDefaultLocalityGroup();
-    FileSKVWriter oddWriter = FileOperations.getInstance().openWriter(odd, fs, conf, aconf);
+    FileSKVWriter oddWriter = FileOperations.getInstance().openWriter(odd, fs, conf, null, aconf);
     oddWriter.startDefaultLocalityGroup();
     long timestamp = System.currentTimeMillis();
     Text cf = new Text("cf");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/TestIngest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index 9f79c71..8e196c4 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -218,7 +218,8 @@ public class TestIngest {
 
     if (opts.outputFile != null) {
       Configuration conf = CachedConfiguration.getInstance();
-      writer = FileOperations.getInstance().openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
+      writer = FileOperations.getInstance()
+          .openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf, null, AccumuloConfiguration.getDefaultConfiguration());
       writer.startDefaultLocalityGroup();
     } else {
       bw = connector.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
index 1abafeb..9c2ab55 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
@@ -74,17 +74,17 @@ public class BulkFileIT extends AccumuloClusterHarness {
 
     fs.delete(new Path(dir), true);
 
-    FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, aconf);
+    FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, null, aconf);
     writer1.startDefaultLocalityGroup();
     writeData(writer1, 0, 333);
     writer1.close();
 
-    FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, aconf);
+    FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, null, aconf);
     writer2.startDefaultLocalityGroup();
     writeData(writer2, 334, 999);
     writer2.close();
 
-    FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, aconf);
+    FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, null, aconf);
     writer3.startDefaultLocalityGroup();
     writeData(writer3, 1000, 1999);
     writer3.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
index fda3713..93ebdb9 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
@@ -193,7 +193,7 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
 
       Configuration conf = CachedConfiguration.getInstance();
       DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
-      FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, acuconf)
+      FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, null, acuconf)
           .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
       assertNotNull(sample);
     } else {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
index 883a657..2b172c0 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
@@ -205,7 +205,7 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
 
       Configuration conf = CachedConfiguration.getInstance();
       DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
-      FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, acuconf)
+      FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, null, acuconf)
           .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
       assertNotNull(sample);
     } else {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java b/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
index 1c94ff8..9db6dd1 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
@@ -90,7 +90,7 @@ public class FastBulkImportIT extends ConfigurableMacBase {
     fs.mkdirs(bulkFailures);
     fs.mkdirs(files);
     for (int i = 0; i < 100; i++) {
-      FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+      FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
           AccumuloConfiguration.getDefaultConfiguration());
       writer.startDefaultLocalityGroup();
       for (int j = 0x100; j < 0xfff; j += 3) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index f829b73..fac8c16 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -447,7 +447,7 @@ public class CollectTabletStats {
 
     for (FileRef file : files) {
       FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
-      FileSKVIterator reader = FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), aconf);
+      FileSKVIterator reader = FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), null, aconf);
       Range range = new Range(ke.getPrevEndRow(), false, ke.getEndRow(), true);
       reader.seek(range, columnSet, columnSet.size() == 0 ? false : true);
       while (reader.hasTop() && !range.afterEndKey(reader.getTopKey())) {
@@ -477,7 +477,7 @@ public class CollectTabletStats {
 
     for (FileRef file : files) {
       FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
-      readers.add(FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), aconf.getConfiguration()));
+      readers.add(FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), null, aconf.getConfiguration()));
     }
 
     List<IterInfo> emptyIterinfo = Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java b/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
index ed31f58..3ac5b51 100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
@@ -2077,7 +2077,7 @@ public abstract class SimpleProxyBase extends SharedMiniClusterBase {
 
     // Write an RFile
     String filename = dir + "/bulk/import/rfile.rf";
-    FileSKVWriter writer = FileOperations.getInstance().openWriter(filename, fs, fs.getConf(), DefaultConfiguration.getInstance());
+    FileSKVWriter writer = FileOperations.getInstance().openWriter(filename, fs, fs.getConf(), null, DefaultConfiguration.getInstance());
     writer.startDefaultLocalityGroup();
     writer.append(new org.apache.accumulo.core.data.Key(new Text("a"), new Text("b"), new Text("c")), new Value("value".getBytes(UTF_8)));
     writer.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
index c54a8e7..26ea422 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
@@ -83,7 +83,7 @@ public class BulkPlusOne extends BulkImportTest {
 
     for (int i = 0; i < parts; i++) {
       String fileName = dir + "/" + String.format("part_%d.", i) + RFile.EXTENSION;
-      FileSKVWriter f = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), defaultConfiguration);
+      FileSKVWriter f = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), null, defaultConfiguration);
       f.startDefaultLocalityGroup();
       int start = rows.get(i);
       int end = rows.get(i + 1);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
index 5af08ec..ec89a5a 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.test.randomwalk.Environment;
 import org.apache.accumulo.test.randomwalk.State;
@@ -52,8 +53,8 @@ public class BulkImport extends Test {
 
     public RFileBatchWriter(Configuration conf, FileSystem fs, String file) throws IOException {
       AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
-      CachableBlockFile.Writer cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, conf.getInt("io.file.buffer.size", 4096),
-          (short) conf.getInt("dfs.replication", 3), conf.getLong("dfs.block.size", 1 << 26)), "gz", conf, aconf);
+      CachableBlockFile.Writer cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(fs.create(new Path(file), false,
+          conf.getInt("io.file.buffer.size", 4096), (short) conf.getInt("dfs.replication", 3), conf.getLong("dfs.block.size", 1 << 26))), "gz", conf, aconf);
       writer = new RFile.Writer(cbw, 100000);
       writer.startDefaultLocalityGroup();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
index 2612fc9..4a60866 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
@@ -203,7 +203,7 @@ public class TableOp extends Test {
         Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString());
         Path fail = new Path(dir.toString() + "_fail");
         FileSystem fs = WalkingSecurity.get(state, env).getFs();
-        FileSKVWriter f = FileOperations.getInstance().openWriter(dir + "/securityBulk." + RFile.EXTENSION, fs, fs.getConf(),
+        FileSKVWriter f = FileOperations.getInstance().openWriter(dir + "/securityBulk." + RFile.EXTENSION, fs, fs.getConf(), null,
             AccumuloConfiguration.getDefaultConfiguration());
         f.startDefaultLocalityGroup();
         fs.mkdirs(fail);


[3/3] accumulo git commit: Merge branch 'ACCUMULO-4187' of github:shawnwalker/accumulo

Posted by ct...@apache.org.
Merge branch 'ACCUMULO-4187' of github:shawnwalker/accumulo


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c8e6fe74
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c8e6fe74
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c8e6fe74

Branch: refs/heads/master
Commit: c8e6fe7495496a5e6601b523df381aa086f6510b
Parents: 4f45908 783314c
Author: Christopher Tubbs <ct...@apache.org>
Authored: Tue Apr 19 13:50:48 2016 -0400
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Tue Apr 19 13:50:48 2016 -0400

----------------------------------------------------------------------
 .../core/client/impl/OfflineIterator.java       |   2 +-
 .../client/mapred/AccumuloFileOutputFormat.java |   2 +-
 .../mapreduce/AccumuloFileOutputFormat.java     |   2 +-
 .../core/client/mock/MockTableOperations.java   |   2 +-
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../accumulo/core/file/BloomFilterLayer.java    |   4 +-
 .../core/file/DispatchingFileFactory.java       |  25 +--
 .../accumulo/core/file/FileOperations.java      |  16 +-
 .../file/blockfile/impl/CachableBlockFile.java  |  53 ++++--
 .../core/file/map/MapFileOperations.java        |  20 +--
 .../accumulo/core/file/rfile/CreateEmpty.java   |   4 +-
 .../core/file/rfile/RFileOperations.java        |  31 ++--
 .../accumulo/core/file/rfile/SplitLarge.java    |   4 +-
 .../accumulo/core/file/rfile/bcfile/BCFile.java |  99 +++++-----
 .../bcfile/BoundedRangeFileInputStream.java     | 154 ----------------
 .../streams/BoundedRangeFileInputStream.java    | 152 ++++++++++++++++
 .../streams/PositionedDataOutputStream.java     |  35 ++++
 .../core/file/streams/PositionedOutput.java     |  26 +++
 .../core/file/streams/PositionedOutputs.java    |  68 +++++++
 .../file/streams/RateLimitedInputStream.java    |  69 +++++++
 .../file/streams/RateLimitedOutputStream.java   |  57 ++++++
 .../file/streams/SeekableDataInputStream.java   |  46 +++++
 .../core/util/ratelimit/GuavaRateLimiter.java   |  63 +++++++
 .../core/util/ratelimit/NullRateLimiter.java    |  33 ++++
 .../core/util/ratelimit/RateLimiter.java        |  27 +++
 .../ratelimit/SharedRateLimiterFactory.java     | 180 +++++++++++++++++++
 .../client/mock/MockTableOperationsTest.java    |   2 +-
 .../core/file/BloomFilterLayerLookupTest.java   |   4 +-
 .../accumulo/core/file/FileOperationsTest.java  |   2 +-
 .../core/file/rfile/CreateCompatTestFile.java   |   2 +-
 .../core/file/rfile/MultiLevelIndexTest.java    |   3 +-
 .../accumulo/core/file/rfile/RFileTest.java     |   3 +-
 .../core/file/streams/MockRateLimiter.java      |  38 ++++
 .../streams/RateLimitedInputStreamTest.java     |  69 +++++++
 .../streams/RateLimitedOutputStreamTest.java    |  56 ++++++
 .../accumulo/server/client/BulkImporter.java    |   2 +-
 .../apache/accumulo/server/init/Initialize.java |   2 +-
 .../apache/accumulo/server/util/FileUtil.java   |  10 +-
 .../server/client/BulkImporterTest.java         |   2 +-
 .../apache/accumulo/tserver/FileManager.java    |   2 +-
 .../apache/accumulo/tserver/InMemoryMap.java    |   4 +-
 .../apache/accumulo/tserver/TabletServer.java   |  29 ++-
 .../compaction/MajorCompactionRequest.java      |   2 +-
 .../accumulo/tserver/tablet/Compactor.java      |   9 +-
 .../accumulo/tserver/tablet/MinorCompactor.java |  11 ++
 .../apache/accumulo/tserver/tablet/Tablet.java  |  18 +-
 .../accumulo/tserver/tablet/TabletData.java     |   2 +-
 .../accumulo/test/BulkImportMonitoringIT.java   |   2 +-
 .../accumulo/test/CompactionRateLimitingIT.java |  81 +++++++++
 .../apache/accumulo/test/CreateRandomRFile.java |   2 +-
 .../accumulo/test/GenerateSequentialRFile.java  |   2 +-
 .../apache/accumulo/test/GetFileInfoBulkIT.java |   2 +-
 .../org/apache/accumulo/test/ShellServerIT.java |   4 +-
 .../org/apache/accumulo/test/TestIngest.java    |   3 +-
 .../accumulo/test/functional/BulkFileIT.java    |   6 +-
 .../test/mapred/AccumuloFileOutputFormatIT.java |   2 +-
 .../mapreduce/AccumuloFileOutputFormatIT.java   |   2 +-
 .../performance/metadata/FastBulkImportIT.java  |   2 +-
 .../performance/scan/CollectTabletStats.java    |   4 +-
 .../accumulo/test/proxy/SimpleProxyBase.java    |   2 +-
 .../test/randomwalk/bulk/BulkPlusOne.java       |   2 +-
 .../test/randomwalk/concurrent/BulkImport.java  |   5 +-
 .../test/randomwalk/security/TableOp.java       |   2 +-
 63 files changed, 1253 insertions(+), 318 deletions(-)
----------------------------------------------------------------------



[2/3] accumulo git commit: ACCUMULO-4187: Added rate limiting for major compactions.

Posted by ct...@apache.org.
ACCUMULO-4187: Added rate limiting for major compactions.

Added configuration property tserver.compaction.major.throughput of type PropertyType.MEMORY with a default of 0B (unlimited).  If another value is specified (e.g. 30M), then all tablet servers will limit the I/O performed during major compaction accordingly (e.g. neither reading nor writing more than 30MiB per second combined over all major compaction threads).


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/783314c8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/783314c8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/783314c8

Branch: refs/heads/master
Commit: 783314c890c988d7e824148fc97384718f1e3561
Parents: 584b812
Author: Shawn Walker <ac...@shawn-walker.net>
Authored: Wed Apr 6 13:18:09 2016 -0400
Committer: Shawn Walker <ac...@shawn-walker.net>
Committed: Tue Apr 19 13:18:50 2016 -0400

----------------------------------------------------------------------
 .../core/client/impl/OfflineIterator.java       |   2 +-
 .../client/mapred/AccumuloFileOutputFormat.java |   2 +-
 .../mapreduce/AccumuloFileOutputFormat.java     |   2 +-
 .../core/client/mock/MockTableOperations.java   |   2 +-
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../accumulo/core/file/BloomFilterLayer.java    |   4 +-
 .../core/file/DispatchingFileFactory.java       |  25 +--
 .../accumulo/core/file/FileOperations.java      |  16 +-
 .../file/blockfile/impl/CachableBlockFile.java  |  53 ++++--
 .../core/file/map/MapFileOperations.java        |  20 +--
 .../accumulo/core/file/rfile/CreateEmpty.java   |   4 +-
 .../core/file/rfile/RFileOperations.java        |  31 ++--
 .../accumulo/core/file/rfile/SplitLarge.java    |   4 +-
 .../accumulo/core/file/rfile/bcfile/BCFile.java |  99 +++++-----
 .../bcfile/BoundedRangeFileInputStream.java     | 154 ----------------
 .../streams/BoundedRangeFileInputStream.java    | 152 ++++++++++++++++
 .../streams/PositionedDataOutputStream.java     |  35 ++++
 .../core/file/streams/PositionedOutput.java     |  26 +++
 .../core/file/streams/PositionedOutputs.java    |  68 +++++++
 .../file/streams/RateLimitedInputStream.java    |  69 +++++++
 .../file/streams/RateLimitedOutputStream.java   |  57 ++++++
 .../file/streams/SeekableDataInputStream.java   |  46 +++++
 .../core/util/ratelimit/GuavaRateLimiter.java   |  63 +++++++
 .../core/util/ratelimit/NullRateLimiter.java    |  33 ++++
 .../core/util/ratelimit/RateLimiter.java        |  27 +++
 .../ratelimit/SharedRateLimiterFactory.java     | 180 +++++++++++++++++++
 .../client/mock/MockTableOperationsTest.java    |   2 +-
 .../core/file/BloomFilterLayerLookupTest.java   |   4 +-
 .../accumulo/core/file/FileOperationsTest.java  |   2 +-
 .../core/file/rfile/CreateCompatTestFile.java   |   2 +-
 .../core/file/rfile/MultiLevelIndexTest.java    |   3 +-
 .../accumulo/core/file/rfile/RFileTest.java     |   3 +-
 .../core/file/streams/MockRateLimiter.java      |  38 ++++
 .../streams/RateLimitedInputStreamTest.java     |  69 +++++++
 .../streams/RateLimitedOutputStreamTest.java    |  56 ++++++
 .../accumulo/server/client/BulkImporter.java    |   2 +-
 .../apache/accumulo/server/init/Initialize.java |   2 +-
 .../apache/accumulo/server/util/FileUtil.java   |  10 +-
 .../server/client/BulkImporterTest.java         |   2 +-
 .../apache/accumulo/tserver/FileManager.java    |   2 +-
 .../apache/accumulo/tserver/InMemoryMap.java    |   4 +-
 .../apache/accumulo/tserver/TabletServer.java   |  29 ++-
 .../compaction/MajorCompactionRequest.java      |   2 +-
 .../accumulo/tserver/tablet/Compactor.java      |   9 +-
 .../accumulo/tserver/tablet/MinorCompactor.java |  11 ++
 .../apache/accumulo/tserver/tablet/Tablet.java  |  18 +-
 .../accumulo/tserver/tablet/TabletData.java     |   2 +-
 .../accumulo/test/BulkImportMonitoringIT.java   |   2 +-
 .../accumulo/test/CompactionRateLimitingIT.java |  81 +++++++++
 .../apache/accumulo/test/CreateRandomRFile.java |   2 +-
 .../accumulo/test/GenerateSequentialRFile.java  |   2 +-
 .../apache/accumulo/test/GetFileInfoBulkIT.java |   2 +-
 .../org/apache/accumulo/test/ShellServerIT.java |   4 +-
 .../org/apache/accumulo/test/TestIngest.java    |   3 +-
 .../accumulo/test/functional/BulkFileIT.java    |   6 +-
 .../test/mapred/AccumuloFileOutputFormatIT.java |   2 +-
 .../mapreduce/AccumuloFileOutputFormatIT.java   |   2 +-
 .../performance/metadata/FastBulkImportIT.java  |   2 +-
 .../performance/scan/CollectTabletStats.java    |   4 +-
 .../accumulo/test/proxy/SimpleProxyBase.java    |   2 +-
 .../test/randomwalk/bulk/BulkPlusOne.java       |   2 +-
 .../test/randomwalk/concurrent/BulkImport.java  |   5 +-
 .../test/randomwalk/security/TableOp.java       |   2 +-
 63 files changed, 1253 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
index 487af11..20c53e1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
@@ -345,7 +345,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
     // TODO need to close files - ACCUMULO-1303
     for (String file : absFiles) {
       FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem();
-      FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null);
+      FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, null, acuTableConf, null, null);
       if (scannerSamplerConfigImpl != null) {
         reader = reader.getSample(scannerSamplerConfigImpl);
         if (reader == null)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
index d636776..8fa5f62 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
@@ -186,7 +186,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
         }
 
         if (out == null) {
-          out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf);
+          out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, null, acuConf);
           out.startDefaultLocalityGroup();
         }
         out.append(key, value);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
index b241f33..2d62279 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
@@ -184,7 +184,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
         }
 
         if (out == null) {
-          out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf);
+          out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, null, acuConf);
           out.startDefaultLocalityGroup();
         }
         out.append(key, value);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
index d465138..aa64a10 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
@@ -289,7 +289,7 @@ class MockTableOperations extends TableOperationsHelper {
      */
     for (FileStatus importStatus : fs.listStatus(importPath)) {
       try {
-        FileSKVIterator importIterator = FileOperations.getInstance().openReader(importStatus.getPath().toString(), true, fs, fs.getConf(),
+        FileSKVIterator importIterator = FileOperations.getInstance().openReader(importStatus.getPath().toString(), true, fs, fs.getConf(), null,
             AccumuloConfiguration.getDefaultConfiguration());
         while (importIterator.hasTop()) {
           Key key = importIterator.getTopKey();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index a2d01e7..bc1e60e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -293,6 +293,8 @@ public enum Property {
       "The maximum number of concurrent tablet migrations for a tablet server"),
   TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", PropertyType.COUNT,
       "The maximum number of concurrent major compactions for a tablet server"),
+  TSERV_MAJC_THROUGHPUT("tserver.compaction.major.throughput", "0B", PropertyType.MEMORY,
+      "Maximum number of bytes to read or write per second over all major compactions on a TabletServer, or 0B for unlimited."),
   TSERV_MINC_MAXCONCURRENT("tserver.compaction.minor.concurrent.max", "4", PropertyType.COUNT,
       "The maximum number of concurrent minor compactions for a tablet server"),
   TSERV_MAJC_TRACE_PERCENT("tserver.compaction.major.trace.percent", "0.1", PropertyType.FRACTION, "The percent of major compactions to trace"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index 758df12..c9918bd 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -458,7 +458,7 @@ public class BloomFilterLayer {
 
     String suffix = FileOperations.getNewFileExtension(acuconf);
     String fname = "/tmp/test." + suffix;
-    FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, acuconf);
+    FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, null, acuconf);
 
     long t1 = System.currentTimeMillis();
 
@@ -477,7 +477,7 @@ public class BloomFilterLayer {
     bmfw.close();
 
     t1 = System.currentTimeMillis();
-    FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, acuconf);
+    FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, null, acuconf);
     t2 = System.currentTimeMillis();
     out.println("Opened " + fname + " in " + (t2 - t1));
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
index 1e7ecc9..9478a29 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.map.MapFileOperations;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -65,8 +66,9 @@ class DispatchingFileFactory extends FileOperations {
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf) throws IOException {
+    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, null, null);
     if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
       return new BloomFilterLayer.Reader(iter, acuconf);
     }
@@ -74,8 +76,8 @@ class DispatchingFileFactory extends FileOperations {
   }
 
   @Override
-  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, acuconf);
+  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
+    FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, writeLimiter, acuconf);
     if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
       return new BloomFilterLayer.Writer(writer, acuconf);
     }
@@ -89,32 +91,32 @@ class DispatchingFileFactory extends FileOperations {
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf) throws IOException {
-    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, null, null);
+      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
+    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf, null, null);
   }
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
 
     if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
       indexCache = null;
     if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
       dataCache = null;
 
-    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, dataCache, indexCache);
+    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf, dataCache, indexCache);
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
-      BlockCache dataCache, BlockCache indexCache) throws IOException {
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
 
     if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
       indexCache = null;
     if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
       dataCache = null;
 
-    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, dataCache, indexCache);
+    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, dataCache, indexCache);
     if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
       return new BloomFilterLayer.Reader(iter, acuconf);
     }
@@ -132,5 +134,4 @@ class DispatchingFileFactory extends FileOperations {
 
     return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 3798453..dc7c646 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
@@ -55,23 +56,24 @@ public abstract class FileOperations {
    */
 
   public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf) throws IOException;
+      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
 
   public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
+      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
 
   /**
    * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
    *
    */
 
-  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
-      throws IOException;
+  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf) throws IOException;
 
-  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
-      BlockCache dataCache, BlockCache indexCache) throws IOException;
+  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException;
 
-  public abstract FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException;
+  public abstract FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf)
+      throws IOException;
 
   public abstract FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 07ac5af..6a170e8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.ref.SoftReference;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -33,11 +34,14 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.file.streams.PositionedOutput;
+import org.apache.accumulo.core.file.streams.RateLimitedInputStream;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,20 +59,22 @@ public class CachableBlockFile {
   public static class Writer implements BlockFileWriter {
     private BCFile.Writer _bc;
     private BlockWrite _bw;
-    private final FSDataOutputStream fsout;
+    private final PositionedOutput fsout;
     private long length = 0;
 
-    public Writer(FileSystem fs, Path fName, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
-      this.fsout = fs.create(fName);
-      init(fsout, compressAlgor, conf, accumuloConfiguration);
+    public Writer(FileSystem fs, Path fName, String compressAlgor, RateLimiter writeLimiter, Configuration conf, AccumuloConfiguration accumuloConfiguration)
+        throws IOException {
+      this(new RateLimitedOutputStream(fs.create(fName), writeLimiter), compressAlgor, conf, accumuloConfiguration);
     }
 
-    public Writer(FSDataOutputStream fsout, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+    public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fsout, String compressAlgor, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
       this.fsout = fsout;
       init(fsout, compressAlgor, conf, accumuloConfiguration);
     }
 
-    private void init(FSDataOutputStream fsout, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+    private <OutputStreamT extends OutputStream & PositionedOutput> void init(OutputStreamT fsout, String compressAlgor, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
       _bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration);
     }
 
@@ -90,8 +96,8 @@ public class CachableBlockFile {
       _bw.close();
       _bc.close();
 
-      length = this.fsout.getPos();
-      this.fsout.close();
+      length = this.fsout.position();
+      ((OutputStream) this.fsout).close();
     }
 
     @Override
@@ -139,11 +145,12 @@ public class CachableBlockFile {
    *
    */
   public static class Reader implements BlockFileReader {
+    private final RateLimiter readLimiter;
     private BCFile.Reader _bc;
     private String fileName = "not_available";
     private BlockCache _dCache = null;
     private BlockCache _iCache = null;
-    private FSDataInputStream fin = null;
+    private InputStream fin = null;
     private FileSystem fs;
     private Configuration conf;
     private boolean closed = false;
@@ -221,6 +228,11 @@ public class CachableBlockFile {
 
     public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration)
         throws IOException {
+      this(fs, dataFile, conf, data, index, null, accumuloConfiguration);
+    }
+
+    public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, RateLimiter readLimiter,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
 
       /*
        * Grab path create input stream grab len create file
@@ -232,21 +244,25 @@ public class CachableBlockFile {
       this.fs = fs;
       this.conf = conf;
       this.accumuloConfiguration = accumuloConfiguration;
+      this.readLimiter = readLimiter;
     }
 
-    public Reader(FSDataInputStream fsin, long len, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
+    public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf, BlockCache data, BlockCache index,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
       this._dCache = data;
       this._iCache = index;
+      this.readLimiter = null;
       init(fsin, len, conf, accumuloConfiguration);
     }
 
-    public Reader(FSDataInputStream fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
-      // this.fin = fsin;
+    public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this.readLimiter = null;
       init(fsin, len, conf, accumuloConfiguration);
     }
 
-    private void init(FSDataInputStream fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+    private <InputStreamT extends InputStream & Seekable> void init(InputStreamT fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration)
+        throws IOException {
       this._bc = new BCFile.Reader(this, fsin, len, conf, accumuloConfiguration);
     }
 
@@ -257,8 +273,9 @@ public class CachableBlockFile {
       if (_bc == null) {
         // lazily open file if needed
         Path path = new Path(fileName);
-        fin = fs.open(path);
-        init(fin, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
+        RateLimitedInputStream fsIn = new RateLimitedInputStream(fs.open(path), this.readLimiter);
+        fin = fsIn;
+        init(fsIn, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
       }
 
       return _bc;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
index 75cfa7e..a72a243 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.MapFileIterator;
 import org.apache.accumulo.core.iterators.system.SequenceFileIterator;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -141,7 +142,8 @@ public class MapFileOperations extends FileOperations {
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf) throws IOException {
     FileSKVIterator iter = new RangeIterator(new MapFileIterator(acuconf, fs, file, conf));
 
     if (seekToBeginning)
@@ -151,10 +153,8 @@ public class MapFileOperations extends FileOperations {
   }
 
   @Override
-  public FileSKVWriter openWriter(final String file, final FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-
+  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
     throw new UnsupportedOperationException();
-
   }
 
   @Override
@@ -169,7 +169,7 @@ public class MapFileOperations extends FileOperations {
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf) throws IOException {
+      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
     MapFileIterator mfIter = new MapFileIterator(tableConf, fs, file, conf);
 
     FileSKVIterator iter = new RangeIterator(mfIter);
@@ -181,16 +181,16 @@ public class MapFileOperations extends FileOperations {
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
 
-    return openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf);
+    return openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf);
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
-      BlockCache dataCache, BlockCache indexCache) throws IOException {
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
 
-    return openReader(file, seekToBeginning, fs, conf, acuconf);
+    return openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
index 75d5567..045bdbb 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@ -77,8 +77,8 @@ public class CreateEmpty {
     for (String arg : opts.files) {
       Path path = new Path(arg);
       log.info("Writing to file '" + path + "'");
-      FileSKVWriter writer = (new RFileOperations())
-          .openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), opts.codec);
+      FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, null, DefaultConfiguration.getDefaultConfiguration(),
+          opts.codec);
       writer.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index a41785a..730a9d3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -36,6 +36,8 @@ import org.apache.accumulo.core.file.rfile.RFile.Writer;
 import org.apache.accumulo.core.sample.Sampler;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -68,16 +70,17 @@ public class RFileOperations extends FileOperations {
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    return openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf) throws IOException {
+    return openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, null, null);
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
-      BlockCache dataCache, BlockCache indexCache) throws IOException {
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
     Path path = new Path(file);
 
-    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, acuconf);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, readLimiter, acuconf);
     Reader iter = new RFile.Reader(_cbr);
 
     if (seekToBeginning) {
@@ -89,26 +92,27 @@ public class RFileOperations extends FileOperations {
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf) throws IOException {
-    FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, null, null);
+      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
+    FileSKVIterator iter = openReader(file, false, fs, conf, readLimiter, tableConf, null, null);
     iter.seek(range, columnFamilies, inclusive);
     return iter;
   }
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
-    FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, dataCache, indexCache);
+      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+    FileSKVIterator iter = openReader(file, false, fs, conf, readLimiter, tableConf, dataCache, indexCache);
     iter.seek(range, columnFamilies, inclusive);
     return iter;
   }
 
   @Override
-  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
+    return openWriter(file, fs, conf, writeLimiter, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
   }
 
-  FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, String compression) throws IOException {
+  FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf, String compression)
+      throws IOException {
     int hrep = conf.getInt("dfs.replication", -1);
     int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
     int rep = hrep;
@@ -132,7 +136,8 @@ public class RFileOperations extends FileOperations {
       sampler = SamplerFactory.newSampler(samplerConfig, acuconf);
     }
 
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf, acuconf);
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(fs.create(new Path(file), false, bufferSize, (short) rep, block),
+        writeLimiter), compression, conf, acuconf);
     Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
     return writer;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
index 4e5b232..6c3aab3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
@@ -69,8 +69,8 @@ public class SplitLarge {
         String largeName = file.substring(0, file.length() - 3) + "_large.rf";
 
         int blockSize = (int) aconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
-        try (Writer small = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(smallName), "gz", conf, aconf), blockSize);
-            Writer large = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(largeName), "gz", conf, aconf), blockSize)) {
+        try (Writer small = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(smallName), "gz", null, conf, aconf), blockSize);
+            Writer large = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(largeName), "gz", null, conf, aconf), blockSize)) {
           small.startDefaultLocalityGroup();
           large.startDefaultLocalityGroup();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 3764603..d7632f3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -45,11 +45,14 @@ import org.apache.accumulo.core.security.crypto.CryptoModule;
 import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
 import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy;
+import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
+import org.apache.accumulo.core.file.streams.PositionedDataOutputStream;
+import org.apache.accumulo.core.file.streams.PositionedOutput;
+import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
@@ -87,7 +90,7 @@ public final class BCFile {
    * BCFile writer, the entry point for creating a new BCFile.
    */
   static public class Writer implements Closeable {
-    private final FSDataOutputStream out;
+    private final PositionedDataOutputStream out;
     private final Configuration conf;
     private final CryptoModule cryptoModule;
     private BCFileCryptoModuleParameters cryptoParams;
@@ -127,7 +130,7 @@ public final class BCFile {
       private final Algorithm compressAlgo;
       private Compressor compressor; // !null only if using native
       // Hadoop compression
-      private final FSDataOutputStream fsOut;
+      private final PositionedDataOutputStream fsOut;
       private final OutputStream cipherOut;
       private final long posStart;
       private final SimpleBufferedOutputStream fsBufferedOutput;
@@ -139,11 +142,11 @@ public final class BCFile {
        * @param cryptoModule
        *          the module to use to obtain cryptographic streams
        */
-      public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf, CryptoModule cryptoModule,
-          CryptoModuleParameters cryptoParams) throws IOException {
+      public WBlockState(Algorithm compressionAlgo, PositionedDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf,
+          CryptoModule cryptoModule, CryptoModuleParameters cryptoParams) throws IOException {
         this.compressAlgo = compressionAlgo;
         this.fsOut = fsOut;
-        this.posStart = fsOut.getPos();
+        this.posStart = fsOut.position();
 
         fsOutputBuffer.setCapacity(getFSOutputBufferSize(conf));
 
@@ -211,7 +214,7 @@ public final class BCFile {
        * @return The current byte offset in underlying file.
        */
       long getCurrentPos() throws IOException {
-        return fsOut.getPos() + fsBufferedOutput.size();
+        return fsOut.position() + fsBufferedOutput.size();
       }
 
       long getStartPos() {
@@ -338,18 +341,18 @@ public final class BCFile {
      *          Name of the compression algorithm, which will be used for all data blocks.
      * @see Compression#getSupportedAlgorithms
      */
-    public Writer(FSDataOutputStream fout, String compressionName, Configuration conf, boolean trackDataBlocks, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
-      if (fout.getPos() != 0) {
+    public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fout, String compressionName, Configuration conf,
+        boolean trackDataBlocks, AccumuloConfiguration accumuloConfiguration) throws IOException {
+      if (fout.position() != 0) {
         throw new IOException("Output file not at zero offset.");
       }
 
-      this.out = fout;
+      this.out = new PositionedDataOutputStream(fout);
       this.conf = conf;
       dataIndex = new DataIndex(compressionName, trackDataBlocks);
       metaIndex = new MetaIndex();
       fsOutputBuffer = new BytesWritable();
-      Magic.write(fout);
+      Magic.write(this.out);
 
       // Set up crypto-related detail, including secret key generation and encryption
 
@@ -388,14 +391,14 @@ public final class BCFile {
             appender.close();
           }
 
-          long offsetIndexMeta = out.getPos();
+          long offsetIndexMeta = out.position();
           metaIndex.write(out);
 
           if (cryptoParams.getAlgorithmName() == null || cryptoParams.getAlgorithmName().equals(Property.CRYPTO_CIPHER_SUITE.getDefaultValue())) {
             out.writeLong(offsetIndexMeta);
             API_VERSION_1.write(out);
           } else {
-            long offsetCryptoParameters = out.getPos();
+            long offsetCryptoParameters = out.position();
             cryptoParams.write(out);
 
             // Meta Index, crypto params offsets and the trailing section are written out directly.
@@ -594,7 +597,7 @@ public final class BCFile {
   static public class Reader implements Closeable {
     private static final String META_NAME = "BCFile.metaindex";
     private static final String CRYPTO_BLOCK_NAME = "BCFile.cryptoparams";
-    private final FSDataInputStream in;
+    private final SeekableDataInputStream in;
     private final Configuration conf;
     final DataIndex dataIndex;
     // Index for meta blocks
@@ -613,8 +616,8 @@ public final class BCFile {
       private final BlockRegion region;
       private final InputStream in;
 
-      public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf, CryptoModule cryptoModule,
-          Version bcFileVersion, CryptoModuleParameters cryptoParams) throws IOException {
+      public <InputStreamType extends InputStream & Seekable> RBlockState(Algorithm compressionAlgo, InputStreamType fsin, BlockRegion region,
+          Configuration conf, CryptoModule cryptoModule, Version bcFileVersion, CryptoModuleParameters cryptoParams) throws IOException {
         this.compressAlgo = compressionAlgo;
         this.region = region;
         this.decompressor = compressionAlgo.getDecompressor();
@@ -752,15 +755,15 @@ public final class BCFile {
      * @param fileLength
      *          Length of the corresponding file
      */
-    public Reader(FSDataInputStream fin, long fileLength, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
-
-      this.in = fin;
+    public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fin, long fileLength, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this.in = new SeekableDataInputStream(fin);
       this.conf = conf;
 
       // Move the cursor to grab the version and the magic first
-      fin.seek(fileLength - Magic.size() - Version.size());
-      version = new Version(fin);
-      Magic.readAndVerify(fin);
+      this.in.seek(fileLength - Magic.size() - Version.size());
+      version = new Version(this.in);
+      Magic.readAndVerify(this.in);
 
       // Do a version check
       if (!version.compatibleWith(BCFile.API_VERSION) && !version.equals(BCFile.API_VERSION_1)) {
@@ -772,26 +775,26 @@ public final class BCFile {
       long offsetCryptoParameters = 0;
 
       if (version.equals(API_VERSION_1)) {
-        fin.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
-        offsetIndexMeta = fin.readLong();
+        this.in.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
+        offsetIndexMeta = this.in.readLong();
 
       } else {
-        fin.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
-        offsetIndexMeta = fin.readLong();
-        offsetCryptoParameters = fin.readLong();
+        this.in.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
+        offsetIndexMeta = this.in.readLong();
+        offsetCryptoParameters = this.in.readLong();
       }
 
       // read meta index
-      fin.seek(offsetIndexMeta);
-      metaIndex = new MetaIndex(fin);
+      this.in.seek(offsetIndexMeta);
+      metaIndex = new MetaIndex(this.in);
 
       // If they exist, read the crypto parameters
       if (!version.equals(BCFile.API_VERSION_1)) {
 
         // read crypto parameters
-        fin.seek(offsetCryptoParameters);
+        this.in.seek(offsetCryptoParameters);
         cryptoParams = new BCFileCryptoModuleParameters();
-        cryptoParams.read(fin);
+        cryptoParams.read(this.in);
 
         this.cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoParams.getAllOptions().get(Property.CRYPTO_MODULE_CLASS.getKey()));
 
@@ -832,9 +835,9 @@ public final class BCFile {
       }
     }
 
-    public Reader(CachableBlockFile.Reader cache, FSDataInputStream fin, long fileLength, Configuration conf, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
-      this.in = fin;
+    public <InputStreamType extends InputStream & Seekable> Reader(CachableBlockFile.Reader cache, InputStreamType fin, long fileLength, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this.in = new SeekableDataInputStream(fin);
       this.conf = conf;
 
       BlockRead cachedMetaIndex = cache.getCachedMetaBlock(META_NAME);
@@ -845,9 +848,9 @@ public final class BCFile {
         // move the cursor to the beginning of the tail, containing: offset to the
         // meta block index, version and magic
         // Move the cursor to grab the version and the magic first
-        fin.seek(fileLength - Magic.size() - Version.size());
-        version = new Version(fin);
-        Magic.readAndVerify(fin);
+        this.in.seek(fileLength - Magic.size() - Version.size());
+        version = new Version(this.in);
+        Magic.readAndVerify(this.in);
 
         // Do a version check
         if (!version.compatibleWith(BCFile.API_VERSION) && !version.equals(BCFile.API_VERSION_1)) {
@@ -859,26 +862,26 @@ public final class BCFile {
         long offsetCryptoParameters = 0;
 
         if (version.equals(API_VERSION_1)) {
-          fin.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
-          offsetIndexMeta = fin.readLong();
+          this.in.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
+          offsetIndexMeta = this.in.readLong();
 
         } else {
-          fin.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
-          offsetIndexMeta = fin.readLong();
-          offsetCryptoParameters = fin.readLong();
+          this.in.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
+          offsetIndexMeta = this.in.readLong();
+          offsetCryptoParameters = this.in.readLong();
         }
 
         // read meta index
-        fin.seek(offsetIndexMeta);
-        metaIndex = new MetaIndex(fin);
+        this.in.seek(offsetIndexMeta);
+        metaIndex = new MetaIndex(this.in);
 
         // If they exist, read the crypto parameters
         if (!version.equals(BCFile.API_VERSION_1) && cachedCryptoParams == null) {
 
           // read crypto parameters
-          fin.seek(offsetCryptoParameters);
+          this.in.seek(offsetCryptoParameters);
           cryptoParams = new BCFileCryptoModuleParameters();
-          cryptoParams.read(fin);
+          cryptoParams.read(this.in);
 
           if (accumuloConfiguration.getBoolean(Property.CRYPTO_OVERRIDE_KEY_STRATEGY_WITH_CONFIGURED_STRATEGY)) {
             Map<String,String> cryptoConfFromAccumuloConf = accumuloConfiguration.getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
deleted file mode 100644
index f93bb84..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.accumulo.core.file.rfile.bcfile;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.AccessController;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-
-/**
- * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop FSDataInputStream as a regular input stream. One can create multiple
- * BoundedRangeFileInputStream on top of the same FSDataInputStream and they would not interfere with each other.
- */
-class BoundedRangeFileInputStream extends InputStream {
-
-  private FSDataInputStream in;
-  private long pos;
-  private long end;
-  private long mark;
-  private final byte[] oneByte = new byte[1];
-
-  /**
-   * Constructor
-   *
-   * @param in
-   *          The FSDataInputStream we connect to.
-   * @param offset
-   *          Beginning offset of the region.
-   * @param length
-   *          Length of the region.
-   *
-   *          The actual length of the region may be smaller if (off_begin + length) goes beyond the end of FS input stream.
-   */
-  public BoundedRangeFileInputStream(FSDataInputStream in, long offset, long length) {
-    if (offset < 0 || length < 0) {
-      throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + "/" + length);
-    }
-
-    this.in = in;
-    this.pos = offset;
-    this.end = offset + length;
-    this.mark = -1;
-  }
-
-  @Override
-  public int available() throws IOException {
-    int avail = in.available();
-    if (pos + avail > end) {
-      avail = (int) (end - pos);
-    }
-
-    return avail;
-  }
-
-  @Override
-  public int read() throws IOException {
-    int ret = read(oneByte);
-    if (ret == 1)
-      return oneByte[0] & 0xff;
-    return -1;
-  }
-
-  @Override
-  public int read(byte[] b) throws IOException {
-    return read(b, 0, b.length);
-  }
-
-  @Override
-  public int read(final byte[] b, final int off, int len) throws IOException {
-    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
-      throw new IndexOutOfBoundsException();
-    }
-
-    final int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
-    if (n == 0)
-      return -1;
-    Integer ret = 0;
-    final FSDataInputStream inLocal = in;
-    synchronized (inLocal) {
-      inLocal.seek(pos);
-      try {
-        ret = AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>() {
-          @Override
-          public Integer run() throws IOException {
-            int ret = 0;
-            ret = inLocal.read(b, off, n);
-            return ret;
-          }
-        });
-      } catch (PrivilegedActionException e) {
-        throw (IOException) e.getException();
-      }
-    }
-    if (ret < 0) {
-      end = pos;
-      return -1;
-    }
-    pos += ret;
-    return ret;
-  }
-
-  @Override
-  /*
-   * We may skip beyond the end of the file.
-   */
-  public long skip(long n) throws IOException {
-    long len = Math.min(n, end - pos);
-    pos += len;
-    return len;
-  }
-
-  @Override
-  public void mark(int readlimit) {
-    mark = pos;
-  }
-
-  @Override
-  public void reset() throws IOException {
-    if (mark < 0)
-      throw new IOException("Resetting to invalid mark");
-    pos = mark;
-  }
-
-  @Override
-  public boolean markSupported() {
-    return true;
-  }
-
-  @Override
-  public void close() {
-    // Invalidate the state of the stream.
-    in = null;
-    pos = end;
-    mark = -1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
new file mode 100644
index 0000000..1c01843
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop FSDataInputStream as a regular input stream. One can create multiple
+ * BoundedRangeFileInputStream on top of the same FSDataInputStream and they would not interfere with each other.
+ */
+public class BoundedRangeFileInputStream extends InputStream {
+  private InputStream in;
+  private long pos;
+  private long end;
+  private long mark;
+  private final byte[] oneByte = new byte[1];
+
+  /**
+   * Constructor
+   *
+   * @param in
+   *          The FSDataInputStream we connect to.
+   * @param offset
+   *          Beginning offset of the region.
+   * @param length
+   *          Length of the region.
+   *
+   *          The actual length of the region may be smaller if (off_begin + length) goes beyond the end of FS input stream.
+   */
+  public <StreamType extends InputStream & Seekable> BoundedRangeFileInputStream(StreamType in, long offset, long length) {
+    if (offset < 0 || length < 0) {
+      throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + "/" + length);
+    }
+
+    this.in = in;
+    this.pos = offset;
+    this.end = offset + length;
+    this.mark = -1;
+  }
+
+  @Override
+  public int available() throws IOException {
+    int avail = in.available();
+    if (pos + avail > end) {
+      avail = (int) (end - pos);
+    }
+
+    return avail;
+  }
+
+  @Override
+  public int read() throws IOException {
+    int ret = read(oneByte);
+    if (ret == 1)
+      return oneByte[0] & 0xff;
+    return -1;
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  @Override
+  public int read(final byte[] b, final int off, int len) throws IOException {
+    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    final int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
+    if (n == 0)
+      return -1;
+    Integer ret = 0;
+    final InputStream inLocal = in;
+    synchronized (inLocal) {
+      ((Seekable) inLocal).seek(pos);
+      try {
+        ret = AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>() {
+          @Override
+          public Integer run() throws IOException {
+            int ret = 0;
+            ret = inLocal.read(b, off, n);
+            return ret;
+          }
+        });
+      } catch (PrivilegedActionException e) {
+        throw (IOException) e.getException();
+      }
+    }
+    if (ret < 0) {
+      end = pos;
+      return -1;
+    }
+    pos += ret;
+    return ret;
+  }
+
+  @Override
+  /*
+   * We may skip beyond the end of the file.
+   */
+  public long skip(long n) throws IOException {
+    long len = Math.min(n, end - pos);
+    pos += len;
+    return len;
+  }
+
+  @Override
+  public void mark(int readlimit) {
+    mark = pos;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    if (mark < 0)
+      throw new IOException("Resetting to invalid mark");
+    pos = mark;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return true;
+  }
+
+  @Override
+  public void close() {
+    // Invalidate the state of the stream.
+    in = null;
+    pos = end;
+    mark = -1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
new file mode 100644
index 0000000..bd18426
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A filter converting a {@link PositionedOutput} {@code OutputStream} to a {@code PositionedOutput} {@code DataOutputStream}
+ */
+public class PositionedDataOutputStream extends DataOutputStream implements PositionedOutput {
+  public <StreamType extends OutputStream & PositionedOutput> PositionedDataOutputStream(StreamType type) {
+    super(type);
+  }
+
+  @Override
+  public long position() throws IOException {
+    return ((PositionedOutput) out).position();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
new file mode 100644
index 0000000..e5dcba4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.IOException;
+
+/**
+ * For any byte sink (but especially OutputStream), the ability to report how many bytes have been sunk.
+ */
+public interface PositionedOutput {
+  public long position() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
new file mode 100644
index 0000000..4769818
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+/**
+ * Utility functions for {@link PositionedOutput}.
+ */
+public class PositionedOutputs {
+  private PositionedOutputs() {}
+
+  /** Convert an {@code OutputStream} into an {@code OutputStream} implementing {@link PositionedOutput}. */
+  public static PositionedOutputStream wrap(final OutputStream fout) {
+    Objects.requireNonNull(fout);
+    if (fout instanceof FSDataOutputStream) {
+      return new PositionedOutputStream(fout) {
+        @Override
+        public long position() throws IOException {
+          return ((FSDataOutputStream) fout).getPos();
+        }
+      };
+    } else if (fout instanceof PositionedOutput) {
+      return new PositionedOutputStream(fout) {
+        @Override
+        public long position() throws IOException {
+          return ((PositionedOutput) fout).position();
+        }
+      };
+    } else {
+      return new PositionedOutputStream(fout) {
+        @Override
+        public long position() throws IOException {
+          throw new UnsupportedOperationException("Underlying stream does not support position()");
+        }
+      };
+    }
+  }
+
+  private static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
+    public PositionedOutputStream(OutputStream stream) {
+      super(stream);
+    }
+
+    @Override
+    public void write(byte[] data, int off, int len) throws IOException {
+      out.write(data, off, len);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
new file mode 100644
index 0000000..5254086
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * A decorator for an {@code InputStream} which limits the rate at which reads are performed.
+ */
+public class RateLimitedInputStream extends FilterInputStream implements Seekable {
+  private final RateLimiter rateLimiter;
+
+  public <StreamType extends InputStream & Seekable> RateLimitedInputStream(StreamType stream, RateLimiter rateLimiter) {
+    super(stream);
+    this.rateLimiter = rateLimiter == null ? NullRateLimiter.INSTANCE : rateLimiter;
+  }
+
+  @Override
+  public int read() throws IOException {
+    int val = in.read();
+    if (val >= 0) {
+      rateLimiter.acquire(1);
+    }
+    return val;
+  }
+
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    int count = in.read(buffer, offset, length);
+    if (count > 0) {
+      rateLimiter.acquire(count);
+    }
+    return count;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    ((Seekable) in).seek(pos);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return ((Seekable) in).getPos();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return ((Seekable) in).seekToNewSource(targetPos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
new file mode 100644
index 0000000..b426a6b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+
+/**
+ * A decorator for {@code OutputStream} which limits the rate at which data may be written.
+ */
+public class RateLimitedOutputStream extends FilterOutputStream implements PositionedOutput {
+  private final RateLimiter writeLimiter;
+
+  public RateLimitedOutputStream(OutputStream wrappedStream, RateLimiter writeLimiter) {
+    super(PositionedOutputs.wrap(wrappedStream));
+    this.writeLimiter = writeLimiter == null ? NullRateLimiter.INSTANCE : writeLimiter;
+  }
+
+  @Override
+  public void write(int i) throws IOException {
+    writeLimiter.acquire(1);
+    out.write(i);
+  }
+
+  @Override
+  public void write(byte[] buffer, int offset, int length) throws IOException {
+    writeLimiter.acquire(length);
+    out.write(buffer, offset, length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    out.close();
+  }
+
+  @Override
+  public long position() throws IOException {
+    return ((PositionedOutput) out).position();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
new file mode 100644
index 0000000..09060f5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * A wrapper converting a {@link Seekable} {@code InputStream} into a {@code Seekable} {@link DataInputStream}
+ */
+public class SeekableDataInputStream extends DataInputStream implements Seekable {
+  public <StreamType extends InputStream & Seekable> SeekableDataInputStream(StreamType stream) {
+    super(stream);
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    ((Seekable) in).seek(pos);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return ((Seekable) in).getPos();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return ((Seekable) in).seekToNewSource(targetPos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
new file mode 100644
index 0000000..6e9781d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+/** Rate limiter from the Guava library. */
+public class GuavaRateLimiter implements RateLimiter {
+  private final com.google.common.util.concurrent.RateLimiter rateLimiter;
+  private long currentRate;
+
+  /**
+   * Constructor
+   *
+   * @param initialRate
+   *          Count of permits which should be made available per second. A nonpositive rate is taken to indicate there should be no limitation on rate.
+   */
+  public GuavaRateLimiter(long initialRate) {
+    this.currentRate = initialRate;
+    this.rateLimiter = com.google.common.util.concurrent.RateLimiter.create(initialRate > 0 ? initialRate : Long.MAX_VALUE);
+  }
+
+  @Override
+  public long getRate() {
+    return currentRate;
+  }
+
+  /**
+   * Change the rate at which permits are made available.
+   *
+   * @param newRate
+   *          Count of permits which should be made available per second. A nonpositive rate is taken to indicate that there should be no limitation on rate.
+   */
+  public void setRate(long newRate) {
+    this.rateLimiter.setRate(newRate > 0 ? newRate : Long.MAX_VALUE);
+    this.currentRate = newRate;
+  }
+
+  @Override
+  public void acquire(long permits) {
+    if (this.currentRate > 0) {
+      while (permits > Integer.MAX_VALUE) {
+        rateLimiter.acquire(Integer.MAX_VALUE);
+        permits -= Integer.MAX_VALUE;
+      }
+      if (permits > 0) {
+        rateLimiter.acquire((int) permits);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
new file mode 100644
index 0000000..ac746c8
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+/** A rate limiter which doesn't actually limit rates at all. */
+public class NullRateLimiter implements RateLimiter {
+  public static final NullRateLimiter INSTANCE = new NullRateLimiter();
+
+  private NullRateLimiter() {}
+
+  @Override
+  public long getRate() {
+    return 0;
+  }
+
+  @Override
+  public void acquire(long permits) {}
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
new file mode 100644
index 0000000..ff64840
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+public interface RateLimiter {
+  /**
+   * Get current QPS of the rate limiter, with a nonpositive rate indicating no limit.
+   */
+  public long getRate();
+
+  /** Sleep until the specified number of queries are available. */
+  public void acquire(long permits);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
new file mode 100644
index 0000000..ac1eec9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.WeakHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides the ability to retrieve a {@link RateLimiter} keyed to a specific string, which will dynamically update its rate according to a specified callback
+ * function.
+ */
+public class SharedRateLimiterFactory {
+  private static final long REPORT_RATE = 60000;
+  private static final long UPDATE_RATE = 1000;
+  private static SharedRateLimiterFactory instance = null;
+  private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
+  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
+
+  private SharedRateLimiterFactory() {}
+
+  /** Get the singleton instance of the SharedRateLimiterFactory. */
+  public static synchronized SharedRateLimiterFactory getInstance() {
+    if (instance == null) {
+      instance = new SharedRateLimiterFactory();
+
+      Timer timer = new Timer("SharedRateLimiterFactory update/report polling");
+
+      // Update periodically
+      timer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          instance.update();
+        }
+      }, UPDATE_RATE, UPDATE_RATE);
+
+      // Report periodically
+      timer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          instance.report();
+        }
+      }, REPORT_RATE, REPORT_RATE);
+    }
+    return instance;
+  }
+
+  /**
+   * A callback which provides the current rate for a {@link RateLimiter}.
+   */
+  public static interface RateProvider {
+    /**
+     * Calculate the current rate for the {@link RateLimiter}.
+     *
+     * @return Count of permits which should be provided per second. A nonpositive count is taken to indicate that no rate limiting should be performed.
+     */
+    public long getDesiredRate();
+  }
+
+  /**
+   * Lookup the RateLimiter associated with the specified name, or create a new one for that name.
+   *
+   * @param name
+   *          key for the rate limiter
+   * @param rateProvider
+   *          a function which can be called to get what the current rate for the rate limiter should be.
+   */
+  public RateLimiter create(String name, RateProvider rateProvider) {
+    synchronized (activeLimiters) {
+      if (activeLimiters.containsKey(name)) {
+        SharedRateLimiter limiter = activeLimiters.get(name);
+        return limiter;
+      } else {
+        long initialRate;
+        initialRate = rateProvider.getDesiredRate();
+        SharedRateLimiter limiter = new SharedRateLimiter(name, rateProvider, initialRate);
+        activeLimiters.put(name, limiter);
+        return limiter;
+      }
+    }
+  }
+
+  /**
+   * Walk through all of the currently active RateLimiters, having each update its current rate. This is called periodically so that we can dynamically update
+   * as configuration changes.
+   */
+  protected void update() {
+    Map<String,SharedRateLimiter> limitersCopy;
+    synchronized (activeLimiters) {
+      limitersCopy = ImmutableMap.copyOf(activeLimiters);
+    }
+    for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
+      try {
+        entry.getValue().update();
+      } catch (Exception ex) {
+        log.error(String.format("Failed to update limiter %s", entry.getKey()), ex);
+      }
+    }
+  }
+
+  /** Walk through all of the currently active RateLimiters, having each report its activity to the debug log. */
+  protected void report() {
+    Map<String,SharedRateLimiter> limitersCopy;
+    synchronized (activeLimiters) {
+      limitersCopy = ImmutableMap.copyOf(activeLimiters);
+    }
+    for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
+      try {
+        entry.getValue().report();
+      } catch (Exception ex) {
+        log.error(String.format("Failed to report limiter %s", entry.getKey()), ex);
+      }
+    }
+  }
+
+  protected class SharedRateLimiter extends GuavaRateLimiter {
+    private volatile long permitsAcquired = 0;
+    private volatile long lastUpdate;
+
+    private final RateProvider rateProvider;
+    private final String name;
+
+    SharedRateLimiter(String name, RateProvider rateProvider, long initialRate) {
+      super(initialRate);
+      this.name = name;
+      this.rateProvider = rateProvider;
+      this.lastUpdate = System.currentTimeMillis();
+    }
+
+    @Override
+    public void acquire(long permits) {
+      super.acquire(permits);
+      permitsAcquired += permits;
+    }
+
+    /** Poll the callback, updating the current rate if necessary. */
+    public void update() {
+      // Reset rate if needed
+      long rate = rateProvider.getDesiredRate();
+      if (rate != getRate()) {
+        setRate(rate);
+      }
+    }
+
+    /** Report the current throughput and usage of this rate limiter to the debug log. */
+    public void report() {
+      if (log.isDebugEnabled()) {
+        long duration = System.currentTimeMillis() - lastUpdate;
+        if (duration == 0)
+          return;
+        lastUpdate = System.currentTimeMillis();
+
+        long sum = permitsAcquired;
+        permitsAcquired = 0;
+
+        if (sum > 0) {
+          log.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / duration, getRate()));
+        }
+      }
+    }
+  }
+}