You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2016/04/19 19:51:54 UTC
[1/3] accumulo git commit: ACCUMULO-4187: Added rate limiting for
major compactions.
Repository: accumulo
Updated Branches:
refs/heads/master 4f45908c0 -> c8e6fe749
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java b/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
index 792e199..ce49f36 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
@@ -225,7 +225,7 @@ public class MockTableOperationsTest {
fs.delete(tempFile, true);
fs.mkdirs(failures);
fs.mkdirs(tempFile.getParent());
- FileSKVWriter writer = FileOperations.getInstance().openWriter(tempFile.toString(), fs, defaultConf, AccumuloConfiguration.getDefaultConfiguration());
+ FileSKVWriter writer = FileOperations.getInstance().openWriter(tempFile.toString(), fs, defaultConf, null, AccumuloConfiguration.getDefaultConfiguration());
writer.startDefaultLocalityGroup();
List<Pair<Key,Value>> keyVals = new ArrayList<Pair<Key,Value>>();
for (int i = 0; i < 5; i++) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
index ca388e5..0fb4bd6 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
@@ -80,7 +80,7 @@ public class BloomFilterLayerLookupTest {
// get output file name
String suffix = FileOperations.getNewFileExtension(acuconf);
String fname = new File(tempDir.getRoot(), testName + "." + suffix).getAbsolutePath();
- FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, acuconf);
+ FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, null, acuconf);
// write data to file
long t1 = System.currentTimeMillis();
@@ -96,7 +96,7 @@ public class BloomFilterLayerLookupTest {
bmfw.close();
t1 = System.currentTimeMillis();
- FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, acuconf);
+ FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, null, acuconf);
t2 = System.currentTimeMillis();
LOG.debug("Opened " + fname + " in " + (t2 - t1));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java b/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
index 3fdeb8a..e8ccf35 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
@@ -51,7 +51,7 @@ public class FileOperationsTest {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
- writer = fileOperations.openWriter(filename, fs, conf, acuconf);
+ writer = fileOperations.openWriter(filename, fs, conf, null, acuconf);
writer.close();
} catch (Exception ex) {
caughtException = true;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
index 3eadc06..ad29a69 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
@@ -56,7 +56,7 @@ public class CreateCompatTestFile {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
- CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path(args[0]), "gz", conf, AccumuloConfiguration.getDefaultConfiguration());
+ CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path(args[0]), "gz", null, conf, AccumuloConfiguration.getDefaultConfiguration());
RFile.Writer writer = new RFile.Writer(_cbw, 1000);
writer.startNewLocalityGroup("lg1", ncfs(nf("cf_", 1), nf("cf_", 2)));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 66978dd..391bea1 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,7 +55,7 @@ public class MultiLevelIndexTest extends TestCase {
AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
- CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", CachedConfiguration.getInstance(), aconf);
+ CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), "gz", CachedConfiguration.getInstance(), aconf);
BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index a442347..5f66503 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -86,6 +86,7 @@ import com.google.common.hash.HashCode;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Bytes;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
public class RFileTest {
@@ -224,7 +225,7 @@ public class RFileTest {
public void openWriter(boolean startDLG, int blockSize) throws IOException {
baos = new ByteArrayOutputStream();
dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
- CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf, accumuloConfiguration);
+ CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), "gz", conf, accumuloConfiguration);
SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration);
Sampler sampler = null;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java b/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
new file mode 100644
index 0000000..9574d36
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+
+public class MockRateLimiter implements RateLimiter {
+ private final AtomicLong permitsAcquired = new AtomicLong();
+
+ @Override
+ public long getRate() {
+ return 0;
+ }
+
+ @Override
+ public void acquire(long permits) {
+ permitsAcquired.addAndGet(permits);
+ }
+
+ public long getPermitsAcquired() {
+ return permitsAcquired.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
new file mode 100644
index 0000000..6baff87
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+import org.apache.hadoop.fs.Seekable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RateLimitedInputStreamTest {
+
+ @Test
+ public void permitsAreProperlyAcquired() throws Exception {
+ Random randGen = new Random();
+ MockRateLimiter rateLimiter = new MockRateLimiter();
+ long bytesRetrieved = 0;
+ try (InputStream is = new RateLimitedInputStream(new RandomInputStream(), rateLimiter)) {
+ for (int i = 0; i < 100; ++i) {
+ int count = Math.abs(randGen.nextInt()) % 65536;
+ int countRead = is.read(new byte[count]);
+ Assert.assertEquals(count, countRead);
+ bytesRetrieved += count;
+ }
+ }
+ Assert.assertEquals(bytesRetrieved, rateLimiter.getPermitsAcquired());
+ }
+
+ private static class RandomInputStream extends InputStream implements Seekable {
+ private final Random r = new Random();
+
+ @Override
+ public int read() throws IOException {
+ return r.nextInt() & 0xff;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
new file mode 100644
index 0000000..9e12354
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.util.Random;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RateLimitedOutputStreamTest {
+
+ @Test
+ public void permitsAreProperlyAcquired() throws Exception {
+ Random randGen = new Random();
+ MockRateLimiter rateLimiter = new MockRateLimiter();
+ long bytesWritten = 0;
+ try (RateLimitedOutputStream os = new RateLimitedOutputStream(new NullOutputStream(), rateLimiter)) {
+ for (int i = 0; i < 100; ++i) {
+ byte[] bytes = new byte[Math.abs(randGen.nextInt() % 65536)];
+ os.write(bytes);
+ bytesWritten += bytes.length;
+ }
+ Assert.assertEquals(bytesWritten, os.position());
+ }
+ Assert.assertEquals(bytesWritten, rateLimiter.getPermitsAcquired());
+ }
+
+ public static class NullOutputStream extends FilterOutputStream implements PositionedOutput {
+ public NullOutputStream() {
+ super(new CountingOutputStream(ByteStreams.nullOutputStream()));
+ }
+
+ @Override
+ public long position() throws IOException {
+ return ((CountingOutputStream) out).getCount();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 2656d4b..cf55e35 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -642,7 +642,7 @@ public class BulkImporter {
String filename = file.toString();
// log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
FileSystem fs = vm.getVolumeByPath(file).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), context.getConfiguration());
+ FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), null, context.getConfiguration());
try {
Text row = startRow;
if (row == null)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 4378f2c..e58a79c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -465,7 +465,7 @@ public class Initialize implements KeywordExecutable {
createEntriesForTablet(sorted, tablet);
}
FileSystem fs = volmanager.getVolumeByPath(new Path(fileName)).getFileSystem();
- FileSKVWriter tabletWriter = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), AccumuloConfiguration.getDefaultConfiguration());
+ FileSKVWriter tabletWriter = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), null, AccumuloConfiguration.getDefaultConfiguration());
tabletWriter.startDefaultLocalityGroup();
for (Entry<Key,Value> entry : sorted.entrySet()) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
index 04e17d5..9291808 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
@@ -132,7 +132,7 @@ public class FileUtil {
outFiles.add(newMapFile);
FileSystem ns = fs.getVolumeByPath(new Path(newMapFile)).getFileSystem();
- FileSKVWriter writer = new RFileOperations().openWriter(newMapFile.toString(), ns, ns.getConf(), acuConf);
+ FileSKVWriter writer = new RFileOperations().openWriter(newMapFile.toString(), ns, ns.getConf(), null, acuConf);
writer.startDefaultLocalityGroup();
List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(inFiles.size());
@@ -404,7 +404,7 @@ public class FileUtil {
reader = FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf);
else
reader = FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false,
- ns, ns.getConf(), acuConf);
+ ns, ns.getConf(), null, acuConf);
while (reader.hasTop()) {
Key key = reader.getTopKey();
@@ -428,7 +428,7 @@ public class FileUtil {
readers.add(FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf));
else
readers.add(FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false,
- ns, ns.getConf(), acuConf));
+ ns, ns.getConf(), null, acuConf));
}
return numKeys;
@@ -445,7 +445,7 @@ public class FileUtil {
FileSKVIterator reader = null;
FileSystem ns = fs.getVolumeByPath(mapfile.path()).getFileSystem();
try {
- reader = FileOperations.getInstance().openReader(mapfile.toString(), false, ns, ns.getConf(), acuConf);
+ reader = FileOperations.getInstance().openReader(mapfile.toString(), false, ns, ns.getConf(), null, acuConf);
Key firstKey = reader.getFirstKey();
if (firstKey != null) {
@@ -479,7 +479,7 @@ public class FileUtil {
for (FileRef ref : mapFiles) {
Path path = ref.path();
FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), acuConf);
+ FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), null, acuConf);
try {
if (!reader.hasTop())
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
index 0116101..646e400 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -113,7 +113,7 @@ public class BulkImporterTest {
EasyMock.replay(context);
String file = "target/testFile.rf";
fs.delete(new Path(file), true);
- FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), context.getConfiguration());
+ FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), null, context.getConfiguration());
writer.startDefaultLocalityGroup();
Value empty = new Value(new byte[] {});
writer.append(new Key("a", "cf", "cq"), empty);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index 2928418..c28d060 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -316,7 +316,7 @@ public class FileManager {
Path path = new Path(file);
FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
// log.debug("Opening "+file + " path " + path);
- FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(),
+ FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), null,
context.getServerConfigurationFactory().getTableConfiguration(tablet), dataCache, indexCache);
reservedFiles.add(reader);
readersReserved.put(reader, file);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index ca4719d..5219e33 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -632,7 +632,7 @@ public class InMemoryMap {
Configuration conf = CachedConfiguration.getInstance();
FileSystem fs = FileSystem.getLocal(conf);
- reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, SiteConfiguration.getInstance());
+ reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, null, SiteConfiguration.getInstance());
if (iflag != null)
reader.setInterruptFlag(iflag);
@@ -804,7 +804,7 @@ public class InMemoryMap {
siteConf = createSampleConfig(siteConf);
}
- FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, siteConf);
+ FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, null, siteConf);
InterruptibleIterator iter = map.skvIterator(null);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index f16f726..1523c55 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -256,8 +256,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.net.HostAndPort;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
public class TabletServer extends AccumuloServerContext implements Runnable {
+
private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
@@ -330,7 +333,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
super(confFactory);
this.confFactory = confFactory;
this.fs = fs;
- AccumuloConfiguration aconf = getConfiguration();
+ final AccumuloConfiguration aconf = getConfiguration();
Instance instance = getInstance();
log.info("Version " + Constants.VERSION);
log.info("Instance " + instance.getInstanceID());
@@ -3089,4 +3092,28 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
bulkImportStatus.removeBulkImportStatus(files);
}
+ private static final String MAJC_READ_LIMITER_KEY = "tserv_majc_read";
+ private static final String MAJC_WRITE_LIMITER_KEY = "tserv_majc_write";
+ private final SharedRateLimiterFactory.RateProvider rateProvider = new SharedRateLimiterFactory.RateProvider() {
+ @Override
+ public long getDesiredRate() {
+ return getConfiguration().getMemoryInBytes(Property.TSERV_MAJC_THROUGHPUT);
+ }
+ };
+
+ /**
+ * Get the {@link RateLimiter} for reads during major compactions on this tserver. All writes performed during major compactions are throttled to conform to
+ * this RateLimiter.
+ */
+ public final RateLimiter getMajorCompactionReadLimiter() {
+ return SharedRateLimiterFactory.getInstance().create(MAJC_READ_LIMITER_KEY, rateProvider);
+ }
+
+ /**
+ * Get the RateLimiter for writes during major compations on this tserver. All reads performed during major compactions are throttled to conform to this
+ * RateLimiter.
+ */
+ public final RateLimiter getMajorCompactionWriteLimiter() {
+ return SharedRateLimiterFactory.getInstance().create(MAJC_WRITE_LIMITER_KEY, rateProvider);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
index 26b1b12..79c0c4f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
@@ -77,7 +77,7 @@ public class MajorCompactionRequest implements Cloneable {
// @TODO ensure these files are always closed?
FileOperations fileFactory = FileOperations.getInstance();
FileSystem ns = volumeManager.getVolumeByPath(ref.path()).getFileSystem();
- FileSKVIterator openReader = fileFactory.openReader(ref.path().toString(), true, ns, ns.getConf(), tableConfig);
+ FileSKVIterator openReader = fileFactory.openReader(ref.path().toString(), true, ns, ns.getConf(), null, tableConfig);
return openReader;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
index 2ef4a34..bde5be0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
@@ -81,6 +82,10 @@ public class Compactor implements Callable<CompactionStats> {
boolean isCompactionEnabled();
IteratorScope getIteratorScope();
+
+ RateLimiter getReadLimiter();
+
+ RateLimiter getWriteLimiter();
}
private final Map<FileRef,DataFileValue> filesToCompact;
@@ -192,7 +197,7 @@ public class Compactor implements Callable<CompactionStats> {
try {
FileOperations fileFactory = FileOperations.getInstance();
FileSystem ns = this.fs.getVolumeByPath(outputFilePath).getFileSystem();
- mfw = fileFactory.openWriter(outputFilePathName, ns, ns.getConf(), acuTableConf);
+ mfw = fileFactory.openWriter(outputFilePathName, ns, ns.getConf(), env.getWriteLimiter(), acuTableConf);
Map<String,Set<ByteSequence>> lGroups;
try {
@@ -280,7 +285,7 @@ public class Compactor implements Callable<CompactionStats> {
FileSystem fs = this.fs.getVolumeByPath(mapFile.path()).getFileSystem();
FileSKVIterator reader;
- reader = fileFactory.openReader(mapFile.path().toString(), false, fs, fs.getConf(), acuTableConf);
+ reader = fileFactory.openReader(mapFile.path().toString(), false, fs, fs.getConf(), env.getReadLimiter(), acuTableConf);
readers.add(reader);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index e3f8193..6bd2545 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.problems.ProblemReport;
@@ -69,6 +70,16 @@ public class MinorCompactor extends Compactor {
public IteratorScope getIteratorScope() {
return IteratorScope.minc;
}
+
+ @Override
+ public RateLimiter getReadLimiter() {
+ return null;
+ }
+
+ @Override
+ public RateLimiter getWriteLimiter() {
+ return null;
+ }
}, Collections.<IteratorSetting> emptyList(), mincReason.ordinal(), tableConfig);
this.tabletServer = tabletServer;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index cf99dbd..fb47afc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -18,7 +18,6 @@ package org.apache.accumulo.tserver.tablet;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
@@ -153,6 +152,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import static java.util.Objects.requireNonNull;
/**
*
@@ -1583,7 +1584,7 @@ public class Tablet implements TabletCommitter {
for (Entry<FileRef,DataFileValue> entry : allFiles.entrySet()) {
FileRef file = entry.getKey();
FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
- FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), this.getTableConfiguration());
+ FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), null, this.getTableConfiguration());
try {
Key first = openReader.getFirstKey();
Key last = openReader.getLastKey();
@@ -1807,8 +1808,8 @@ public class Tablet implements TabletCommitter {
AccumuloConfiguration tableConf = createTableConfiguration(tableConfiguration, plan);
Span span = Trace.start("compactFiles");
- try {
+ try {
CompactionEnv cenv = new CompactionEnv() {
@Override
public boolean isCompactionEnabled() {
@@ -1819,6 +1820,17 @@ public class Tablet implements TabletCommitter {
public IteratorScope getIteratorScope() {
return IteratorScope.majc;
}
+
+ @Override
+ public RateLimiter getReadLimiter() {
+ return getTabletServer().getMajorCompactionReadLimiter();
+ }
+
+ @Override
+ public RateLimiter getWriteLimiter() {
+ return getTabletServer().getMajorCompactionWriteLimiter();
+ }
+
};
HashMap<FileRef,DataFileValue> copy = new HashMap<FileRef,DataFileValue>(getDatafileManager().getDatafileSizes());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
index d1cd5ce..d66863e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
@@ -149,7 +149,7 @@ public class TabletData {
dataFiles.put(ref, dfv);
FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), conf);
+ FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), null, conf);
long maxTime = -1;
try {
while (reader.hasTop()) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java b/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
index 7ba253f..2f90d7e 100644
--- a/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
@@ -98,7 +98,7 @@ public class BulkImportMonitoringIT extends ConfigurableMacBase {
fs.mkdirs(bulkFailures);
fs.mkdirs(files);
for (int i = 0; i < 10; i++) {
- FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+ FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
AccumuloConfiguration.getDefaultConfiguration());
writer.startDefaultLocalityGroup();
for (int j = 0x100; j < 0xfff; j += 3) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java b/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java
new file mode 100644
index 0000000..6aa6930
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Random;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CompactionRateLimitingIT extends ConfigurableMacBase {
+ public static final long BYTES_TO_WRITE = 10 * 1024 * 1024;
+ public static final long RATE = 1 * 1024 * 1024;
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) {
+ cfg.setProperty(Property.TSERV_MAJC_THROUGHPUT, RATE + "B");
+ cfg.setProperty(Property.TABLE_MAJC_RATIO, "20");
+ cfg.setProperty(Property.TABLE_FILE_COMPRESSION_TYPE, "none");
+ }
+
+ @Test
+ public void majorCompactionsAreRateLimited() throws Exception {
+ long bytesWritten = 0;
+ String tableName = getUniqueNames(1)[0];
+ Connector conn = getCluster().getConnector("root", new PasswordToken(ROOT_PASSWORD));
+ conn.tableOperations().create(tableName);
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ try {
+ Random r = new Random();
+ while (bytesWritten < BYTES_TO_WRITE) {
+ byte[] rowKey = new byte[32];
+ r.nextBytes(rowKey);
+
+ byte[] qual = new byte[32];
+ r.nextBytes(qual);
+
+ byte[] value = new byte[1024];
+ r.nextBytes(value);
+
+ Mutation m = new Mutation(rowKey);
+ m.put(new byte[0], qual, value);
+ bw.addMutation(m);
+
+ bytesWritten += rowKey.length + qual.length + value.length;
+ }
+ } finally {
+ bw.close();
+ }
+
+ conn.tableOperations().flush(tableName, null, null, true);
+
+ long compactionStart = System.currentTimeMillis();
+ conn.tableOperations().compact(tableName, null, null, false, true);
+ long duration = System.currentTimeMillis() - compactionStart;
+ Assert.assertTrue(
+ String.format("Expected a compaction rate of no more than %,d bytes/sec, but saw a rate of %,f bytes/sec", RATE, 1000.0 * bytesWritten / duration),
+ duration > 1000L * BYTES_TO_WRITE / RATE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java b/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
index ada8504..3929dc7 100644
--- a/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
+++ b/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
@@ -69,7 +69,7 @@ public class CreateRandomRFile {
FileSKVWriter mfw;
try {
FileSystem fs = FileSystem.get(conf);
- mfw = new RFileOperations().openWriter(file, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
+ mfw = new RFileOperations().openWriter(file, fs, conf, null, AccumuloConfiguration.getDefaultConfiguration());
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java b/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
index 64a1aaf..21b2b78 100644
--- a/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
+++ b/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
@@ -58,7 +58,7 @@ public class GenerateSequentialRFile implements Runnable {
final Configuration conf = new Configuration();
Path p = new Path(opts.filePath);
final FileSystem fs = p.getFileSystem(conf);
- FileSKVWriter writer = FileOperations.getInstance().openWriter(opts.filePath, fs, conf, DefaultConfiguration.getInstance());
+ FileSKVWriter writer = FileOperations.getInstance().openWriter(opts.filePath, fs, conf, null, DefaultConfiguration.getInstance());
writer.startDefaultLocalityGroup();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java b/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
index 259a19e..b042d97 100644
--- a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
@@ -123,7 +123,7 @@ public class GetFileInfoBulkIT extends ConfigurableMacBase {
fs.mkdirs(bulkFailures);
fs.mkdirs(files);
for (int i = 0; i < 100; i++) {
- FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+ FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
AccumuloConfiguration.getDefaultConfiguration());
writer.startDefaultLocalityGroup();
for (int j = 0x100; j < 0xfff; j += 3) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
index 28112ac..0f0c292 100644
--- a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
@@ -1338,9 +1338,9 @@ public class ShellServerIT extends SharedMiniClusterBase {
assertTrue(errorsDir.mkdir());
fs.mkdirs(new Path(errorsDir.toString()));
AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
- FileSKVWriter evenWriter = FileOperations.getInstance().openWriter(even, fs, conf, aconf);
+ FileSKVWriter evenWriter = FileOperations.getInstance().openWriter(even, fs, conf, null, aconf);
evenWriter.startDefaultLocalityGroup();
- FileSKVWriter oddWriter = FileOperations.getInstance().openWriter(odd, fs, conf, aconf);
+ FileSKVWriter oddWriter = FileOperations.getInstance().openWriter(odd, fs, conf, null, aconf);
oddWriter.startDefaultLocalityGroup();
long timestamp = System.currentTimeMillis();
Text cf = new Text("cf");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/TestIngest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index 9f79c71..8e196c4 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -218,7 +218,8 @@ public class TestIngest {
if (opts.outputFile != null) {
Configuration conf = CachedConfiguration.getInstance();
- writer = FileOperations.getInstance().openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
+ writer = FileOperations.getInstance()
+ .openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf, null, AccumuloConfiguration.getDefaultConfiguration());
writer.startDefaultLocalityGroup();
} else {
bw = connector.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
index 1abafeb..9c2ab55 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
@@ -74,17 +74,17 @@ public class BulkFileIT extends AccumuloClusterHarness {
fs.delete(new Path(dir), true);
- FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, aconf);
+ FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, null, aconf);
writer1.startDefaultLocalityGroup();
writeData(writer1, 0, 333);
writer1.close();
- FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, aconf);
+ FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, null, aconf);
writer2.startDefaultLocalityGroup();
writeData(writer2, 334, 999);
writer2.close();
- FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, aconf);
+ FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, null, aconf);
writer3.startDefaultLocalityGroup();
writeData(writer3, 1000, 1999);
writer3.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
index fda3713..93ebdb9 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
@@ -193,7 +193,7 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
Configuration conf = CachedConfiguration.getInstance();
DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
- FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, acuconf)
+ FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, null, acuconf)
.getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
assertNotNull(sample);
} else {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
index 883a657..2b172c0 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
@@ -205,7 +205,7 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
Configuration conf = CachedConfiguration.getInstance();
DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
- FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, acuconf)
+ FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, null, acuconf)
.getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
assertNotNull(sample);
} else {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java b/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
index 1c94ff8..9db6dd1 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
@@ -90,7 +90,7 @@ public class FastBulkImportIT extends ConfigurableMacBase {
fs.mkdirs(bulkFailures);
fs.mkdirs(files);
for (int i = 0; i < 100; i++) {
- FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+ FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
AccumuloConfiguration.getDefaultConfiguration());
writer.startDefaultLocalityGroup();
for (int j = 0x100; j < 0xfff; j += 3) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index f829b73..fac8c16 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -447,7 +447,7 @@ public class CollectTabletStats {
for (FileRef file : files) {
FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), aconf);
+ FileSKVIterator reader = FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), null, aconf);
Range range = new Range(ke.getPrevEndRow(), false, ke.getEndRow(), true);
reader.seek(range, columnSet, columnSet.size() == 0 ? false : true);
while (reader.hasTop() && !range.afterEndKey(reader.getTopKey())) {
@@ -477,7 +477,7 @@ public class CollectTabletStats {
for (FileRef file : files) {
FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
- readers.add(FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), aconf.getConfiguration()));
+ readers.add(FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), null, aconf.getConfiguration()));
}
List<IterInfo> emptyIterinfo = Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java b/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
index ed31f58..3ac5b51 100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
@@ -2077,7 +2077,7 @@ public abstract class SimpleProxyBase extends SharedMiniClusterBase {
// Write an RFile
String filename = dir + "/bulk/import/rfile.rf";
- FileSKVWriter writer = FileOperations.getInstance().openWriter(filename, fs, fs.getConf(), DefaultConfiguration.getInstance());
+ FileSKVWriter writer = FileOperations.getInstance().openWriter(filename, fs, fs.getConf(), null, DefaultConfiguration.getInstance());
writer.startDefaultLocalityGroup();
writer.append(new org.apache.accumulo.core.data.Key(new Text("a"), new Text("b"), new Text("c")), new Value("value".getBytes(UTF_8)));
writer.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
index c54a8e7..26ea422 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
@@ -83,7 +83,7 @@ public class BulkPlusOne extends BulkImportTest {
for (int i = 0; i < parts; i++) {
String fileName = dir + "/" + String.format("part_%d.", i) + RFile.EXTENSION;
- FileSKVWriter f = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), defaultConfiguration);
+ FileSKVWriter f = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), null, defaultConfiguration);
f.startDefaultLocalityGroup();
int start = rows.get(i);
int end = rows.get(i + 1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
index 5af08ec..ec89a5a 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.test.randomwalk.Environment;
import org.apache.accumulo.test.randomwalk.State;
@@ -52,8 +53,8 @@ public class BulkImport extends Test {
public RFileBatchWriter(Configuration conf, FileSystem fs, String file) throws IOException {
AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
- CachableBlockFile.Writer cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, conf.getInt("io.file.buffer.size", 4096),
- (short) conf.getInt("dfs.replication", 3), conf.getLong("dfs.block.size", 1 << 26)), "gz", conf, aconf);
+ CachableBlockFile.Writer cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(fs.create(new Path(file), false,
+ conf.getInt("io.file.buffer.size", 4096), (short) conf.getInt("dfs.replication", 3), conf.getLong("dfs.block.size", 1 << 26))), "gz", conf, aconf);
writer = new RFile.Writer(cbw, 100000);
writer.startDefaultLocalityGroup();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
index 2612fc9..4a60866 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
@@ -203,7 +203,7 @@ public class TableOp extends Test {
Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString());
Path fail = new Path(dir.toString() + "_fail");
FileSystem fs = WalkingSecurity.get(state, env).getFs();
- FileSKVWriter f = FileOperations.getInstance().openWriter(dir + "/securityBulk." + RFile.EXTENSION, fs, fs.getConf(),
+ FileSKVWriter f = FileOperations.getInstance().openWriter(dir + "/securityBulk." + RFile.EXTENSION, fs, fs.getConf(), null,
AccumuloConfiguration.getDefaultConfiguration());
f.startDefaultLocalityGroup();
fs.mkdirs(fail);
[3/3] accumulo git commit: Merge branch 'ACCUMULO-4187' of
github:shawnwalker/accumulo
Posted by ct...@apache.org.
Merge branch 'ACCUMULO-4187' of github:shawnwalker/accumulo
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c8e6fe74
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c8e6fe74
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c8e6fe74
Branch: refs/heads/master
Commit: c8e6fe7495496a5e6601b523df381aa086f6510b
Parents: 4f45908 783314c
Author: Christopher Tubbs <ct...@apache.org>
Authored: Tue Apr 19 13:50:48 2016 -0400
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Tue Apr 19 13:50:48 2016 -0400
----------------------------------------------------------------------
.../core/client/impl/OfflineIterator.java | 2 +-
.../client/mapred/AccumuloFileOutputFormat.java | 2 +-
.../mapreduce/AccumuloFileOutputFormat.java | 2 +-
.../core/client/mock/MockTableOperations.java | 2 +-
.../org/apache/accumulo/core/conf/Property.java | 2 +
.../accumulo/core/file/BloomFilterLayer.java | 4 +-
.../core/file/DispatchingFileFactory.java | 25 +--
.../accumulo/core/file/FileOperations.java | 16 +-
.../file/blockfile/impl/CachableBlockFile.java | 53 ++++--
.../core/file/map/MapFileOperations.java | 20 +--
.../accumulo/core/file/rfile/CreateEmpty.java | 4 +-
.../core/file/rfile/RFileOperations.java | 31 ++--
.../accumulo/core/file/rfile/SplitLarge.java | 4 +-
.../accumulo/core/file/rfile/bcfile/BCFile.java | 99 +++++-----
.../bcfile/BoundedRangeFileInputStream.java | 154 ----------------
.../streams/BoundedRangeFileInputStream.java | 152 ++++++++++++++++
.../streams/PositionedDataOutputStream.java | 35 ++++
.../core/file/streams/PositionedOutput.java | 26 +++
.../core/file/streams/PositionedOutputs.java | 68 +++++++
.../file/streams/RateLimitedInputStream.java | 69 +++++++
.../file/streams/RateLimitedOutputStream.java | 57 ++++++
.../file/streams/SeekableDataInputStream.java | 46 +++++
.../core/util/ratelimit/GuavaRateLimiter.java | 63 +++++++
.../core/util/ratelimit/NullRateLimiter.java | 33 ++++
.../core/util/ratelimit/RateLimiter.java | 27 +++
.../ratelimit/SharedRateLimiterFactory.java | 180 +++++++++++++++++++
.../client/mock/MockTableOperationsTest.java | 2 +-
.../core/file/BloomFilterLayerLookupTest.java | 4 +-
.../accumulo/core/file/FileOperationsTest.java | 2 +-
.../core/file/rfile/CreateCompatTestFile.java | 2 +-
.../core/file/rfile/MultiLevelIndexTest.java | 3 +-
.../accumulo/core/file/rfile/RFileTest.java | 3 +-
.../core/file/streams/MockRateLimiter.java | 38 ++++
.../streams/RateLimitedInputStreamTest.java | 69 +++++++
.../streams/RateLimitedOutputStreamTest.java | 56 ++++++
.../accumulo/server/client/BulkImporter.java | 2 +-
.../apache/accumulo/server/init/Initialize.java | 2 +-
.../apache/accumulo/server/util/FileUtil.java | 10 +-
.../server/client/BulkImporterTest.java | 2 +-
.../apache/accumulo/tserver/FileManager.java | 2 +-
.../apache/accumulo/tserver/InMemoryMap.java | 4 +-
.../apache/accumulo/tserver/TabletServer.java | 29 ++-
.../compaction/MajorCompactionRequest.java | 2 +-
.../accumulo/tserver/tablet/Compactor.java | 9 +-
.../accumulo/tserver/tablet/MinorCompactor.java | 11 ++
.../apache/accumulo/tserver/tablet/Tablet.java | 18 +-
.../accumulo/tserver/tablet/TabletData.java | 2 +-
.../accumulo/test/BulkImportMonitoringIT.java | 2 +-
.../accumulo/test/CompactionRateLimitingIT.java | 81 +++++++++
.../apache/accumulo/test/CreateRandomRFile.java | 2 +-
.../accumulo/test/GenerateSequentialRFile.java | 2 +-
.../apache/accumulo/test/GetFileInfoBulkIT.java | 2 +-
.../org/apache/accumulo/test/ShellServerIT.java | 4 +-
.../org/apache/accumulo/test/TestIngest.java | 3 +-
.../accumulo/test/functional/BulkFileIT.java | 6 +-
.../test/mapred/AccumuloFileOutputFormatIT.java | 2 +-
.../mapreduce/AccumuloFileOutputFormatIT.java | 2 +-
.../performance/metadata/FastBulkImportIT.java | 2 +-
.../performance/scan/CollectTabletStats.java | 4 +-
.../accumulo/test/proxy/SimpleProxyBase.java | 2 +-
.../test/randomwalk/bulk/BulkPlusOne.java | 2 +-
.../test/randomwalk/concurrent/BulkImport.java | 5 +-
.../test/randomwalk/security/TableOp.java | 2 +-
63 files changed, 1253 insertions(+), 318 deletions(-)
----------------------------------------------------------------------
[2/3] accumulo git commit: ACCUMULO-4187: Added rate limiting for
major compactions.
Posted by ct...@apache.org.
ACCUMULO-4187: Added rate limiting for major compactions.
Added configuration property tserver.compaction.major.throughput of type PropertyType.MEMORY with a default of 0B (unlimited). If another value is specified (e.g. 30M), then all tablet servers will limit the I/O performed during major compaction accordingly (e.g. neither reading nor writing more than 30MiB per second combined over all major compaction threads).
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/783314c8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/783314c8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/783314c8
Branch: refs/heads/master
Commit: 783314c890c988d7e824148fc97384718f1e3561
Parents: 584b812
Author: Shawn Walker <ac...@shawn-walker.net>
Authored: Wed Apr 6 13:18:09 2016 -0400
Committer: Shawn Walker <ac...@shawn-walker.net>
Committed: Tue Apr 19 13:18:50 2016 -0400
----------------------------------------------------------------------
.../core/client/impl/OfflineIterator.java | 2 +-
.../client/mapred/AccumuloFileOutputFormat.java | 2 +-
.../mapreduce/AccumuloFileOutputFormat.java | 2 +-
.../core/client/mock/MockTableOperations.java | 2 +-
.../org/apache/accumulo/core/conf/Property.java | 2 +
.../accumulo/core/file/BloomFilterLayer.java | 4 +-
.../core/file/DispatchingFileFactory.java | 25 +--
.../accumulo/core/file/FileOperations.java | 16 +-
.../file/blockfile/impl/CachableBlockFile.java | 53 ++++--
.../core/file/map/MapFileOperations.java | 20 +--
.../accumulo/core/file/rfile/CreateEmpty.java | 4 +-
.../core/file/rfile/RFileOperations.java | 31 ++--
.../accumulo/core/file/rfile/SplitLarge.java | 4 +-
.../accumulo/core/file/rfile/bcfile/BCFile.java | 99 +++++-----
.../bcfile/BoundedRangeFileInputStream.java | 154 ----------------
.../streams/BoundedRangeFileInputStream.java | 152 ++++++++++++++++
.../streams/PositionedDataOutputStream.java | 35 ++++
.../core/file/streams/PositionedOutput.java | 26 +++
.../core/file/streams/PositionedOutputs.java | 68 +++++++
.../file/streams/RateLimitedInputStream.java | 69 +++++++
.../file/streams/RateLimitedOutputStream.java | 57 ++++++
.../file/streams/SeekableDataInputStream.java | 46 +++++
.../core/util/ratelimit/GuavaRateLimiter.java | 63 +++++++
.../core/util/ratelimit/NullRateLimiter.java | 33 ++++
.../core/util/ratelimit/RateLimiter.java | 27 +++
.../ratelimit/SharedRateLimiterFactory.java | 180 +++++++++++++++++++
.../client/mock/MockTableOperationsTest.java | 2 +-
.../core/file/BloomFilterLayerLookupTest.java | 4 +-
.../accumulo/core/file/FileOperationsTest.java | 2 +-
.../core/file/rfile/CreateCompatTestFile.java | 2 +-
.../core/file/rfile/MultiLevelIndexTest.java | 3 +-
.../accumulo/core/file/rfile/RFileTest.java | 3 +-
.../core/file/streams/MockRateLimiter.java | 38 ++++
.../streams/RateLimitedInputStreamTest.java | 69 +++++++
.../streams/RateLimitedOutputStreamTest.java | 56 ++++++
.../accumulo/server/client/BulkImporter.java | 2 +-
.../apache/accumulo/server/init/Initialize.java | 2 +-
.../apache/accumulo/server/util/FileUtil.java | 10 +-
.../server/client/BulkImporterTest.java | 2 +-
.../apache/accumulo/tserver/FileManager.java | 2 +-
.../apache/accumulo/tserver/InMemoryMap.java | 4 +-
.../apache/accumulo/tserver/TabletServer.java | 29 ++-
.../compaction/MajorCompactionRequest.java | 2 +-
.../accumulo/tserver/tablet/Compactor.java | 9 +-
.../accumulo/tserver/tablet/MinorCompactor.java | 11 ++
.../apache/accumulo/tserver/tablet/Tablet.java | 18 +-
.../accumulo/tserver/tablet/TabletData.java | 2 +-
.../accumulo/test/BulkImportMonitoringIT.java | 2 +-
.../accumulo/test/CompactionRateLimitingIT.java | 81 +++++++++
.../apache/accumulo/test/CreateRandomRFile.java | 2 +-
.../accumulo/test/GenerateSequentialRFile.java | 2 +-
.../apache/accumulo/test/GetFileInfoBulkIT.java | 2 +-
.../org/apache/accumulo/test/ShellServerIT.java | 4 +-
.../org/apache/accumulo/test/TestIngest.java | 3 +-
.../accumulo/test/functional/BulkFileIT.java | 6 +-
.../test/mapred/AccumuloFileOutputFormatIT.java | 2 +-
.../mapreduce/AccumuloFileOutputFormatIT.java | 2 +-
.../performance/metadata/FastBulkImportIT.java | 2 +-
.../performance/scan/CollectTabletStats.java | 4 +-
.../accumulo/test/proxy/SimpleProxyBase.java | 2 +-
.../test/randomwalk/bulk/BulkPlusOne.java | 2 +-
.../test/randomwalk/concurrent/BulkImport.java | 5 +-
.../test/randomwalk/security/TableOp.java | 2 +-
63 files changed, 1253 insertions(+), 318 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
index 487af11..20c53e1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
@@ -345,7 +345,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
// TODO need to close files - ACCUMULO-1303
for (String file : absFiles) {
FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null);
+ FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, null, acuTableConf, null, null);
if (scannerSamplerConfigImpl != null) {
reader = reader.getSample(scannerSamplerConfigImpl);
if (reader == null)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
index d636776..8fa5f62 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
@@ -186,7 +186,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
}
if (out == null) {
- out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf);
+ out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, null, acuConf);
out.startDefaultLocalityGroup();
}
out.append(key, value);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
index b241f33..2d62279 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
@@ -184,7 +184,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
}
if (out == null) {
- out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf);
+ out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, null, acuConf);
out.startDefaultLocalityGroup();
}
out.append(key, value);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
index d465138..aa64a10 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
@@ -289,7 +289,7 @@ class MockTableOperations extends TableOperationsHelper {
*/
for (FileStatus importStatus : fs.listStatus(importPath)) {
try {
- FileSKVIterator importIterator = FileOperations.getInstance().openReader(importStatus.getPath().toString(), true, fs, fs.getConf(),
+ FileSKVIterator importIterator = FileOperations.getInstance().openReader(importStatus.getPath().toString(), true, fs, fs.getConf(), null,
AccumuloConfiguration.getDefaultConfiguration());
while (importIterator.hasTop()) {
Key key = importIterator.getTopKey();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index a2d01e7..bc1e60e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -293,6 +293,8 @@ public enum Property {
"The maximum number of concurrent tablet migrations for a tablet server"),
TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", PropertyType.COUNT,
"The maximum number of concurrent major compactions for a tablet server"),
+ TSERV_MAJC_THROUGHPUT("tserver.compaction.major.throughput", "0B", PropertyType.MEMORY,
+ "Maximum number of bytes to read or write per second over all major compactions on a TabletServer, or 0B for unlimited."),
TSERV_MINC_MAXCONCURRENT("tserver.compaction.minor.concurrent.max", "4", PropertyType.COUNT,
"The maximum number of concurrent minor compactions for a tablet server"),
TSERV_MAJC_TRACE_PERCENT("tserver.compaction.major.trace.percent", "0.1", PropertyType.FRACTION, "The percent of major compactions to trace"),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index 758df12..c9918bd 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -458,7 +458,7 @@ public class BloomFilterLayer {
String suffix = FileOperations.getNewFileExtension(acuconf);
String fname = "/tmp/test." + suffix;
- FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, acuconf);
+ FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, null, acuconf);
long t1 = System.currentTimeMillis();
@@ -477,7 +477,7 @@ public class BloomFilterLayer {
bmfw.close();
t1 = System.currentTimeMillis();
- FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, acuconf);
+ FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, null, acuconf);
t2 = System.currentTimeMillis();
out.println("Opened " + fname + " in " + (t2 - t1));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
index 1e7ecc9..9478a29 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.file.map.MapFileOperations;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -65,8 +66,9 @@ class DispatchingFileFactory extends FileOperations {
}
@Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
+ public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+ AccumuloConfiguration acuconf) throws IOException {
+ FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, null, null);
if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
return new BloomFilterLayer.Reader(iter, acuconf);
}
@@ -74,8 +76,8 @@ class DispatchingFileFactory extends FileOperations {
}
@Override
- public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, acuconf);
+ public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
+ FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, writeLimiter, acuconf);
if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
return new BloomFilterLayer.Writer(writer, acuconf);
}
@@ -89,32 +91,32 @@ class DispatchingFileFactory extends FileOperations {
@Override
public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- AccumuloConfiguration tableConf) throws IOException {
- return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, null, null);
+ RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
+ return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf, null, null);
}
@Override
public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+ RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
indexCache = null;
if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
dataCache = null;
- return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, dataCache, indexCache);
+ return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf, dataCache, indexCache);
}
@Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
- BlockCache dataCache, BlockCache indexCache) throws IOException {
+ public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+ AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
indexCache = null;
if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
dataCache = null;
- FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, dataCache, indexCache);
+ FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, dataCache, indexCache);
if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
return new BloomFilterLayer.Reader(iter, acuconf);
}
@@ -132,5 +134,4 @@ class DispatchingFileFactory extends FileOperations {
return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache);
}
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 3798453..dc7c646 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -55,23 +56,24 @@ public abstract class FileOperations {
*/
public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- AccumuloConfiguration tableConf) throws IOException;
+ RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
+ RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
/**
* Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
*
*/
- public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
- throws IOException;
+ public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+ AccumuloConfiguration acuconf) throws IOException;
- public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
- BlockCache dataCache, BlockCache indexCache) throws IOException;
+ public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+ AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException;
- public abstract FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException;
+ public abstract FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf)
+ throws IOException;
public abstract FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 07ac5af..6a170e8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.lang.ref.SoftReference;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -33,11 +34,14 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.file.streams.PositionedOutput;
+import org.apache.accumulo.core.file.streams.RateLimitedInputStream;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,20 +59,22 @@ public class CachableBlockFile {
public static class Writer implements BlockFileWriter {
private BCFile.Writer _bc;
private BlockWrite _bw;
- private final FSDataOutputStream fsout;
+ private final PositionedOutput fsout;
private long length = 0;
- public Writer(FileSystem fs, Path fName, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
- this.fsout = fs.create(fName);
- init(fsout, compressAlgor, conf, accumuloConfiguration);
+ public Writer(FileSystem fs, Path fName, String compressAlgor, RateLimiter writeLimiter, Configuration conf, AccumuloConfiguration accumuloConfiguration)
+ throws IOException {
+ this(new RateLimitedOutputStream(fs.create(fName), writeLimiter), compressAlgor, conf, accumuloConfiguration);
}
- public Writer(FSDataOutputStream fsout, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+ public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fsout, String compressAlgor, Configuration conf,
+ AccumuloConfiguration accumuloConfiguration) throws IOException {
this.fsout = fsout;
init(fsout, compressAlgor, conf, accumuloConfiguration);
}
- private void init(FSDataOutputStream fsout, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+ private <OutputStreamT extends OutputStream & PositionedOutput> void init(OutputStreamT fsout, String compressAlgor, Configuration conf,
+ AccumuloConfiguration accumuloConfiguration) throws IOException {
_bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration);
}
@@ -90,8 +96,8 @@ public class CachableBlockFile {
_bw.close();
_bc.close();
- length = this.fsout.getPos();
- this.fsout.close();
+ length = this.fsout.position();
+ ((OutputStream) this.fsout).close();
}
@Override
@@ -139,11 +145,12 @@ public class CachableBlockFile {
*
*/
public static class Reader implements BlockFileReader {
+ private final RateLimiter readLimiter;
private BCFile.Reader _bc;
private String fileName = "not_available";
private BlockCache _dCache = null;
private BlockCache _iCache = null;
- private FSDataInputStream fin = null;
+ private InputStream fin = null;
private FileSystem fs;
private Configuration conf;
private boolean closed = false;
@@ -221,6 +228,11 @@ public class CachableBlockFile {
public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration)
throws IOException {
+ this(fs, dataFile, conf, data, index, null, accumuloConfiguration);
+ }
+
+ public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, RateLimiter readLimiter,
+ AccumuloConfiguration accumuloConfiguration) throws IOException {
/*
* Grab path create input stream grab len create file
@@ -232,21 +244,25 @@ public class CachableBlockFile {
this.fs = fs;
this.conf = conf;
this.accumuloConfiguration = accumuloConfiguration;
+ this.readLimiter = readLimiter;
}
- public Reader(FSDataInputStream fsin, long len, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration)
- throws IOException {
+ public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf, BlockCache data, BlockCache index,
+ AccumuloConfiguration accumuloConfiguration) throws IOException {
this._dCache = data;
this._iCache = index;
+ this.readLimiter = null;
init(fsin, len, conf, accumuloConfiguration);
}
- public Reader(FSDataInputStream fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
- // this.fin = fsin;
+ public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf,
+ AccumuloConfiguration accumuloConfiguration) throws IOException {
+ this.readLimiter = null;
init(fsin, len, conf, accumuloConfiguration);
}
- private void init(FSDataInputStream fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+ private <InputStreamT extends InputStream & Seekable> void init(InputStreamT fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration)
+ throws IOException {
this._bc = new BCFile.Reader(this, fsin, len, conf, accumuloConfiguration);
}
@@ -257,8 +273,9 @@ public class CachableBlockFile {
if (_bc == null) {
// lazily open file if needed
Path path = new Path(fileName);
- fin = fs.open(path);
- init(fin, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
+ RateLimitedInputStream fsIn = new RateLimitedInputStream(fs.open(path), this.readLimiter);
+ fin = fsIn;
+ init(fsIn, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
}
return _bc;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
index 75cfa7e..a72a243 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.MapFileIterator;
import org.apache.accumulo.core.iterators.system.SequenceFileIterator;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -141,7 +142,8 @@ public class MapFileOperations extends FileOperations {
}
@Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+ public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+ AccumuloConfiguration acuconf) throws IOException {
FileSKVIterator iter = new RangeIterator(new MapFileIterator(acuconf, fs, file, conf));
if (seekToBeginning)
@@ -151,10 +153,8 @@ public class MapFileOperations extends FileOperations {
}
@Override
- public FileSKVWriter openWriter(final String file, final FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-
+ public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
throw new UnsupportedOperationException();
-
}
@Override
@@ -169,7 +169,7 @@ public class MapFileOperations extends FileOperations {
@Override
public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- AccumuloConfiguration tableConf) throws IOException {
+ RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
MapFileIterator mfIter = new MapFileIterator(tableConf, fs, file, conf);
FileSKVIterator iter = new RangeIterator(mfIter);
@@ -181,16 +181,16 @@ public class MapFileOperations extends FileOperations {
@Override
public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+ RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
- return openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf);
+ return openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf);
}
@Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
- BlockCache dataCache, BlockCache indexCache) throws IOException {
+ public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+ AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
- return openReader(file, seekToBeginning, fs, conf, acuconf);
+ return openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
index 75d5567..045bdbb 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@ -77,8 +77,8 @@ public class CreateEmpty {
for (String arg : opts.files) {
Path path = new Path(arg);
log.info("Writing to file '" + path + "'");
- FileSKVWriter writer = (new RFileOperations())
- .openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), opts.codec);
+ FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, null, DefaultConfiguration.getDefaultConfiguration(),
+ opts.codec);
writer.close();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index a41785a..730a9d3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -36,6 +36,8 @@ import org.apache.accumulo.core.file.rfile.RFile.Writer;
import org.apache.accumulo.core.sample.Sampler;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.sample.impl.SamplerFactory;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -68,16 +70,17 @@ public class RFileOperations extends FileOperations {
}
@Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- return openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
+ public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+ AccumuloConfiguration acuconf) throws IOException {
+ return openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, null, null);
}
@Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
- BlockCache dataCache, BlockCache indexCache) throws IOException {
+ public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+ AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
Path path = new Path(file);
- CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, acuconf);
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, readLimiter, acuconf);
Reader iter = new RFile.Reader(_cbr);
if (seekToBeginning) {
@@ -89,26 +92,27 @@ public class RFileOperations extends FileOperations {
@Override
public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- AccumuloConfiguration tableConf) throws IOException {
- FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, null, null);
+ RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
+ FileSKVIterator iter = openReader(file, false, fs, conf, readLimiter, tableConf, null, null);
iter.seek(range, columnFamilies, inclusive);
return iter;
}
@Override
public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
- FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, dataCache, indexCache);
+ RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+ FileSKVIterator iter = openReader(file, false, fs, conf, readLimiter, tableConf, dataCache, indexCache);
iter.seek(range, columnFamilies, inclusive);
return iter;
}
@Override
- public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+ public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
+ return openWriter(file, fs, conf, writeLimiter, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
}
- FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, String compression) throws IOException {
+ FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf, String compression)
+ throws IOException {
int hrep = conf.getInt("dfs.replication", -1);
int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
int rep = hrep;
@@ -132,7 +136,8 @@ public class RFileOperations extends FileOperations {
sampler = SamplerFactory.newSampler(samplerConfig, acuconf);
}
- CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf, acuconf);
+ CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(fs.create(new Path(file), false, bufferSize, (short) rep, block),
+ writeLimiter), compression, conf, acuconf);
Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
return writer;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
index 4e5b232..6c3aab3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
@@ -69,8 +69,8 @@ public class SplitLarge {
String largeName = file.substring(0, file.length() - 3) + "_large.rf";
int blockSize = (int) aconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
- try (Writer small = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(smallName), "gz", conf, aconf), blockSize);
- Writer large = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(largeName), "gz", conf, aconf), blockSize)) {
+ try (Writer small = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(smallName), "gz", null, conf, aconf), blockSize);
+ Writer large = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(largeName), "gz", null, conf, aconf), blockSize)) {
small.startDefaultLocalityGroup();
large.startDefaultLocalityGroup();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 3764603..d7632f3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -45,11 +45,14 @@ import org.apache.accumulo.core.security.crypto.CryptoModule;
import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy;
+import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
+import org.apache.accumulo.core.file.streams.PositionedDataOutputStream;
+import org.apache.accumulo.core.file.streams.PositionedOutput;
+import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
@@ -87,7 +90,7 @@ public final class BCFile {
* BCFile writer, the entry point for creating a new BCFile.
*/
static public class Writer implements Closeable {
- private final FSDataOutputStream out;
+ private final PositionedDataOutputStream out;
private final Configuration conf;
private final CryptoModule cryptoModule;
private BCFileCryptoModuleParameters cryptoParams;
@@ -127,7 +130,7 @@ public final class BCFile {
private final Algorithm compressAlgo;
private Compressor compressor; // !null only if using native
// Hadoop compression
- private final FSDataOutputStream fsOut;
+ private final PositionedDataOutputStream fsOut;
private final OutputStream cipherOut;
private final long posStart;
private final SimpleBufferedOutputStream fsBufferedOutput;
@@ -139,11 +142,11 @@ public final class BCFile {
* @param cryptoModule
* the module to use to obtain cryptographic streams
*/
- public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf, CryptoModule cryptoModule,
- CryptoModuleParameters cryptoParams) throws IOException {
+ public WBlockState(Algorithm compressionAlgo, PositionedDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf,
+ CryptoModule cryptoModule, CryptoModuleParameters cryptoParams) throws IOException {
this.compressAlgo = compressionAlgo;
this.fsOut = fsOut;
- this.posStart = fsOut.getPos();
+ this.posStart = fsOut.position();
fsOutputBuffer.setCapacity(getFSOutputBufferSize(conf));
@@ -211,7 +214,7 @@ public final class BCFile {
* @return The current byte offset in underlying file.
*/
long getCurrentPos() throws IOException {
- return fsOut.getPos() + fsBufferedOutput.size();
+ return fsOut.position() + fsBufferedOutput.size();
}
long getStartPos() {
@@ -338,18 +341,18 @@ public final class BCFile {
* Name of the compression algorithm, which will be used for all data blocks.
* @see Compression#getSupportedAlgorithms
*/
- public Writer(FSDataOutputStream fout, String compressionName, Configuration conf, boolean trackDataBlocks, AccumuloConfiguration accumuloConfiguration)
- throws IOException {
- if (fout.getPos() != 0) {
+ public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fout, String compressionName, Configuration conf,
+ boolean trackDataBlocks, AccumuloConfiguration accumuloConfiguration) throws IOException {
+ if (fout.position() != 0) {
throw new IOException("Output file not at zero offset.");
}
- this.out = fout;
+ this.out = new PositionedDataOutputStream(fout);
this.conf = conf;
dataIndex = new DataIndex(compressionName, trackDataBlocks);
metaIndex = new MetaIndex();
fsOutputBuffer = new BytesWritable();
- Magic.write(fout);
+ Magic.write(this.out);
// Set up crypto-related detail, including secret key generation and encryption
@@ -388,14 +391,14 @@ public final class BCFile {
appender.close();
}
- long offsetIndexMeta = out.getPos();
+ long offsetIndexMeta = out.position();
metaIndex.write(out);
if (cryptoParams.getAlgorithmName() == null || cryptoParams.getAlgorithmName().equals(Property.CRYPTO_CIPHER_SUITE.getDefaultValue())) {
out.writeLong(offsetIndexMeta);
API_VERSION_1.write(out);
} else {
- long offsetCryptoParameters = out.getPos();
+ long offsetCryptoParameters = out.position();
cryptoParams.write(out);
// Meta Index, crypto params offsets and the trailing section are written out directly.
@@ -594,7 +597,7 @@ public final class BCFile {
static public class Reader implements Closeable {
private static final String META_NAME = "BCFile.metaindex";
private static final String CRYPTO_BLOCK_NAME = "BCFile.cryptoparams";
- private final FSDataInputStream in;
+ private final SeekableDataInputStream in;
private final Configuration conf;
final DataIndex dataIndex;
// Index for meta blocks
@@ -613,8 +616,8 @@ public final class BCFile {
private final BlockRegion region;
private final InputStream in;
- public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf, CryptoModule cryptoModule,
- Version bcFileVersion, CryptoModuleParameters cryptoParams) throws IOException {
+ public <InputStreamType extends InputStream & Seekable> RBlockState(Algorithm compressionAlgo, InputStreamType fsin, BlockRegion region,
+ Configuration conf, CryptoModule cryptoModule, Version bcFileVersion, CryptoModuleParameters cryptoParams) throws IOException {
this.compressAlgo = compressionAlgo;
this.region = region;
this.decompressor = compressionAlgo.getDecompressor();
@@ -752,15 +755,15 @@ public final class BCFile {
* @param fileLength
* Length of the corresponding file
*/
- public Reader(FSDataInputStream fin, long fileLength, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
-
- this.in = fin;
+ public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fin, long fileLength, Configuration conf,
+ AccumuloConfiguration accumuloConfiguration) throws IOException {
+ this.in = new SeekableDataInputStream(fin);
this.conf = conf;
// Move the cursor to grab the version and the magic first
- fin.seek(fileLength - Magic.size() - Version.size());
- version = new Version(fin);
- Magic.readAndVerify(fin);
+ this.in.seek(fileLength - Magic.size() - Version.size());
+ version = new Version(this.in);
+ Magic.readAndVerify(this.in);
// Do a version check
if (!version.compatibleWith(BCFile.API_VERSION) && !version.equals(BCFile.API_VERSION_1)) {
@@ -772,26 +775,26 @@ public final class BCFile {
long offsetCryptoParameters = 0;
if (version.equals(API_VERSION_1)) {
- fin.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
- offsetIndexMeta = fin.readLong();
+ this.in.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
+ offsetIndexMeta = this.in.readLong();
} else {
- fin.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
- offsetIndexMeta = fin.readLong();
- offsetCryptoParameters = fin.readLong();
+ this.in.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
+ offsetIndexMeta = this.in.readLong();
+ offsetCryptoParameters = this.in.readLong();
}
// read meta index
- fin.seek(offsetIndexMeta);
- metaIndex = new MetaIndex(fin);
+ this.in.seek(offsetIndexMeta);
+ metaIndex = new MetaIndex(this.in);
// If they exist, read the crypto parameters
if (!version.equals(BCFile.API_VERSION_1)) {
// read crypto parameters
- fin.seek(offsetCryptoParameters);
+ this.in.seek(offsetCryptoParameters);
cryptoParams = new BCFileCryptoModuleParameters();
- cryptoParams.read(fin);
+ cryptoParams.read(this.in);
this.cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoParams.getAllOptions().get(Property.CRYPTO_MODULE_CLASS.getKey()));
@@ -832,9 +835,9 @@ public final class BCFile {
}
}
- public Reader(CachableBlockFile.Reader cache, FSDataInputStream fin, long fileLength, Configuration conf, AccumuloConfiguration accumuloConfiguration)
- throws IOException {
- this.in = fin;
+ public <InputStreamType extends InputStream & Seekable> Reader(CachableBlockFile.Reader cache, InputStreamType fin, long fileLength, Configuration conf,
+ AccumuloConfiguration accumuloConfiguration) throws IOException {
+ this.in = new SeekableDataInputStream(fin);
this.conf = conf;
BlockRead cachedMetaIndex = cache.getCachedMetaBlock(META_NAME);
@@ -845,9 +848,9 @@ public final class BCFile {
// move the cursor to the beginning of the tail, containing: offset to the
// meta block index, version and magic
// Move the cursor to grab the version and the magic first
- fin.seek(fileLength - Magic.size() - Version.size());
- version = new Version(fin);
- Magic.readAndVerify(fin);
+ this.in.seek(fileLength - Magic.size() - Version.size());
+ version = new Version(this.in);
+ Magic.readAndVerify(this.in);
// Do a version check
if (!version.compatibleWith(BCFile.API_VERSION) && !version.equals(BCFile.API_VERSION_1)) {
@@ -859,26 +862,26 @@ public final class BCFile {
long offsetCryptoParameters = 0;
if (version.equals(API_VERSION_1)) {
- fin.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
- offsetIndexMeta = fin.readLong();
+ this.in.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
+ offsetIndexMeta = this.in.readLong();
} else {
- fin.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
- offsetIndexMeta = fin.readLong();
- offsetCryptoParameters = fin.readLong();
+ this.in.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
+ offsetIndexMeta = this.in.readLong();
+ offsetCryptoParameters = this.in.readLong();
}
// read meta index
- fin.seek(offsetIndexMeta);
- metaIndex = new MetaIndex(fin);
+ this.in.seek(offsetIndexMeta);
+ metaIndex = new MetaIndex(this.in);
// If they exist, read the crypto parameters
if (!version.equals(BCFile.API_VERSION_1) && cachedCryptoParams == null) {
// read crypto parameters
- fin.seek(offsetCryptoParameters);
+ this.in.seek(offsetCryptoParameters);
cryptoParams = new BCFileCryptoModuleParameters();
- cryptoParams.read(fin);
+ cryptoParams.read(this.in);
if (accumuloConfiguration.getBoolean(Property.CRYPTO_OVERRIDE_KEY_STRATEGY_WITH_CONFIGURED_STRATEGY)) {
Map<String,String> cryptoConfFromAccumuloConf = accumuloConfiguration.getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
deleted file mode 100644
index f93bb84..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.accumulo.core.file.rfile.bcfile;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.AccessController;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-
-/**
- * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop FSDataInputStream as a regular input stream. One can create multiple
- * BoundedRangeFileInputStream on top of the same FSDataInputStream and they would not interfere with each other.
- */
-class BoundedRangeFileInputStream extends InputStream {
-
- private FSDataInputStream in;
- private long pos;
- private long end;
- private long mark;
- private final byte[] oneByte = new byte[1];
-
- /**
- * Constructor
- *
- * @param in
- * The FSDataInputStream we connect to.
- * @param offset
- * Beginning offset of the region.
- * @param length
- * Length of the region.
- *
- * The actual length of the region may be smaller if (off_begin + length) goes beyond the end of FS input stream.
- */
- public BoundedRangeFileInputStream(FSDataInputStream in, long offset, long length) {
- if (offset < 0 || length < 0) {
- throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + "/" + length);
- }
-
- this.in = in;
- this.pos = offset;
- this.end = offset + length;
- this.mark = -1;
- }
-
- @Override
- public int available() throws IOException {
- int avail = in.available();
- if (pos + avail > end) {
- avail = (int) (end - pos);
- }
-
- return avail;
- }
-
- @Override
- public int read() throws IOException {
- int ret = read(oneByte);
- if (ret == 1)
- return oneByte[0] & 0xff;
- return -1;
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- @Override
- public int read(final byte[] b, final int off, int len) throws IOException {
- if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
- throw new IndexOutOfBoundsException();
- }
-
- final int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
- if (n == 0)
- return -1;
- Integer ret = 0;
- final FSDataInputStream inLocal = in;
- synchronized (inLocal) {
- inLocal.seek(pos);
- try {
- ret = AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>() {
- @Override
- public Integer run() throws IOException {
- int ret = 0;
- ret = inLocal.read(b, off, n);
- return ret;
- }
- });
- } catch (PrivilegedActionException e) {
- throw (IOException) e.getException();
- }
- }
- if (ret < 0) {
- end = pos;
- return -1;
- }
- pos += ret;
- return ret;
- }
-
- @Override
- /*
- * We may skip beyond the end of the file.
- */
- public long skip(long n) throws IOException {
- long len = Math.min(n, end - pos);
- pos += len;
- return len;
- }
-
- @Override
- public void mark(int readlimit) {
- mark = pos;
- }
-
- @Override
- public void reset() throws IOException {
- if (mark < 0)
- throw new IOException("Resetting to invalid mark");
- pos = mark;
- }
-
- @Override
- public boolean markSupported() {
- return true;
- }
-
- @Override
- public void close() {
- // Invalidate the state of the stream.
- in = null;
- pos = end;
- mark = -1;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
new file mode 100644
index 0000000..1c01843
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop FSDataInputStream as a regular input stream. One can create multiple
+ * BoundedRangeFileInputStream on top of the same FSDataInputStream and they would not interfere with each other.
+ */
+public class BoundedRangeFileInputStream extends InputStream {
+ private InputStream in;
+ private long pos;
+ private long end;
+ private long mark;
+ private final byte[] oneByte = new byte[1];
+
+ /**
+ * Constructor
+ *
+ * @param in
+ * The FSDataInputStream we connect to.
+ * @param offset
+ * Beginning offset of the region.
+ * @param length
+ * Length of the region.
+ *
+ * The actual length of the region may be smaller if (off_begin + length) goes beyond the end of FS input stream.
+ */
+ public <StreamType extends InputStream & Seekable> BoundedRangeFileInputStream(StreamType in, long offset, long length) {
+ if (offset < 0 || length < 0) {
+ throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + "/" + length);
+ }
+
+ this.in = in;
+ this.pos = offset;
+ this.end = offset + length;
+ this.mark = -1;
+ }
+
+ @Override
+ public int available() throws IOException {
+ int avail = in.available();
+ if (pos + avail > end) {
+ avail = (int) (end - pos);
+ }
+
+ return avail;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int ret = read(oneByte);
+ if (ret == 1)
+ return oneByte[0] & 0xff;
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, int len) throws IOException {
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ final int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
+ if (n == 0)
+ return -1;
+ Integer ret = 0;
+ final InputStream inLocal = in;
+ synchronized (inLocal) {
+ ((Seekable) inLocal).seek(pos);
+ try {
+ ret = AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>() {
+ @Override
+ public Integer run() throws IOException {
+ int ret = 0;
+ ret = inLocal.read(b, off, n);
+ return ret;
+ }
+ });
+ } catch (PrivilegedActionException e) {
+ throw (IOException) e.getException();
+ }
+ }
+ if (ret < 0) {
+ end = pos;
+ return -1;
+ }
+ pos += ret;
+ return ret;
+ }
+
+ @Override
+ /*
+ * We may skip beyond the end of the file.
+ */
+ public long skip(long n) throws IOException {
+ long len = Math.min(n, end - pos);
+ pos += len;
+ return len;
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ mark = pos;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (mark < 0)
+ throw new IOException("Resetting to invalid mark");
+ pos = mark;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ // Invalidate the state of the stream.
+ in = null;
+ pos = end;
+ mark = -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
new file mode 100644
index 0000000..bd18426
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A filter converting a {@link PositionedOutput} {@code OutputStream} to a {@code PositionedOutput} {@code DataOutputStream}
+ */
+public class PositionedDataOutputStream extends DataOutputStream implements PositionedOutput {
+ public <StreamType extends OutputStream & PositionedOutput> PositionedDataOutputStream(StreamType type) {
+ super(type);
+ }
+
+ @Override
+ public long position() throws IOException {
+ return ((PositionedOutput) out).position();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
new file mode 100644
index 0000000..e5dcba4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.IOException;
+
+/**
+ * For any byte sink (but especially OutputStream), the ability to report how many bytes have been sunk.
+ */
+public interface PositionedOutput {
+ public long position() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
new file mode 100644
index 0000000..4769818
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+/**
+ * Utility functions for {@link PositionedOutput}.
+ */
+public class PositionedOutputs {
+ private PositionedOutputs() {}
+
+ /** Convert an {@code OutputStream} into an {@code OutputStream} implementing {@link PositionedOutput}. */
+ public static PositionedOutputStream wrap(final OutputStream fout) {
+ Objects.requireNonNull(fout);
+ if (fout instanceof FSDataOutputStream) {
+ return new PositionedOutputStream(fout) {
+ @Override
+ public long position() throws IOException {
+ return ((FSDataOutputStream) fout).getPos();
+ }
+ };
+ } else if (fout instanceof PositionedOutput) {
+ return new PositionedOutputStream(fout) {
+ @Override
+ public long position() throws IOException {
+ return ((PositionedOutput) fout).position();
+ }
+ };
+ } else {
+ return new PositionedOutputStream(fout) {
+ @Override
+ public long position() throws IOException {
+ throw new UnsupportedOperationException("Underlying stream does not support position()");
+ }
+ };
+ }
+ }
+
+ private static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
+ public PositionedOutputStream(OutputStream stream) {
+ super(stream);
+ }
+
+ @Override
+ public void write(byte[] data, int off, int len) throws IOException {
+ out.write(data, off, len);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
new file mode 100644
index 0000000..5254086
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * A decorator for an {@code InputStream} which limits the rate at which reads are performed.
+ */
+public class RateLimitedInputStream extends FilterInputStream implements Seekable {
+ private final RateLimiter rateLimiter;
+
+ public <StreamType extends InputStream & Seekable> RateLimitedInputStream(StreamType stream, RateLimiter rateLimiter) {
+ super(stream);
+ this.rateLimiter = rateLimiter == null ? NullRateLimiter.INSTANCE : rateLimiter;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int val = in.read();
+ if (val >= 0) {
+ rateLimiter.acquire(1);
+ }
+ return val;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ int count = in.read(buffer, offset, length);
+ if (count > 0) {
+ rateLimiter.acquire(count);
+ }
+ return count;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ ((Seekable) in).seek(pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return ((Seekable) in).getPos();
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return ((Seekable) in).seekToNewSource(targetPos);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
new file mode 100644
index 0000000..b426a6b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+
+/**
+ * A decorator for {@code OutputStream} which limits the rate at which data may be written.
+ */
+public class RateLimitedOutputStream extends FilterOutputStream implements PositionedOutput {
+ private final RateLimiter writeLimiter;
+
+ public RateLimitedOutputStream(OutputStream wrappedStream, RateLimiter writeLimiter) {
+ super(PositionedOutputs.wrap(wrappedStream));
+ this.writeLimiter = writeLimiter == null ? NullRateLimiter.INSTANCE : writeLimiter;
+ }
+
+ @Override
+ public void write(int i) throws IOException {
+ writeLimiter.acquire(1);
+ out.write(i);
+ }
+
+ @Override
+ public void write(byte[] buffer, int offset, int length) throws IOException {
+ writeLimiter.acquire(length);
+ out.write(buffer, offset, length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ @Override
+ public long position() throws IOException {
+ return ((PositionedOutput) out).position();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
new file mode 100644
index 0000000..09060f5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * A wrapper converting a {@link Seekable} {@code InputStream} into a {@code Seekable} {@link DataInputStream}
+ */
+public class SeekableDataInputStream extends DataInputStream implements Seekable {
+ public <StreamType extends InputStream & Seekable> SeekableDataInputStream(StreamType stream) {
+ super(stream);
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ ((Seekable) in).seek(pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return ((Seekable) in).getPos();
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return ((Seekable) in).seekToNewSource(targetPos);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
new file mode 100644
index 0000000..6e9781d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+/** Rate limiter from the Guava library. */
+public class GuavaRateLimiter implements RateLimiter {
+ private final com.google.common.util.concurrent.RateLimiter rateLimiter;
+ private long currentRate;
+
+ /**
+ * Constructor
+ *
+ * @param initialRate
+ * Count of permits which should be made available per second. A nonpositive rate is taken to indicate there should be no limitation on rate.
+ */
+ public GuavaRateLimiter(long initialRate) {
+ this.currentRate = initialRate;
+ this.rateLimiter = com.google.common.util.concurrent.RateLimiter.create(initialRate > 0 ? initialRate : Long.MAX_VALUE);
+ }
+
+ @Override
+ public long getRate() {
+ return currentRate;
+ }
+
+ /**
+ * Change the rate at which permits are made available.
+ *
+ * @param newRate
+ * Count of permits which should be made available per second. A nonpositive rate is taken to indicate that there should be no limitation on rate.
+ */
+ public void setRate(long newRate) {
+ this.rateLimiter.setRate(newRate > 0 ? newRate : Long.MAX_VALUE);
+ this.currentRate = newRate;
+ }
+
+ @Override
+ public void acquire(long permits) {
+ if (this.currentRate > 0) {
+ while (permits > Integer.MAX_VALUE) {
+ rateLimiter.acquire(Integer.MAX_VALUE);
+ permits -= Integer.MAX_VALUE;
+ }
+ if (permits > 0) {
+ rateLimiter.acquire((int) permits);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
new file mode 100644
index 0000000..ac746c8
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+/** A rate limiter which doesn't actually limit rates at all. */
+public class NullRateLimiter implements RateLimiter {
+ public static final NullRateLimiter INSTANCE = new NullRateLimiter();
+
+ private NullRateLimiter() {}
+
+ @Override
+ public long getRate() {
+ return 0;
+ }
+
+ @Override
+ public void acquire(long permits) {}
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
new file mode 100644
index 0000000..ff64840
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+public interface RateLimiter {
+ /**
+ * Get current QPS of the rate limiter, with a nonpositive rate indicating no limit.
+ */
+ public long getRate();
+
+ /** Sleep until the specified number of queries are available. */
+ public void acquire(long permits);
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
new file mode 100644
index 0000000..ac1eec9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.WeakHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides the ability to retrieve a {@link RateLimiter} keyed to a specific string, which will dynamically update its rate according to a specified callback
+ * function.
+ */
+public class SharedRateLimiterFactory {
+ private static final long REPORT_RATE = 60000;
+ private static final long UPDATE_RATE = 1000;
+ private static SharedRateLimiterFactory instance = null;
+ private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
+ private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
+
+ private SharedRateLimiterFactory() {}
+
+ /** Get the singleton instance of the SharedRateLimiterFactory. */
+ public static synchronized SharedRateLimiterFactory getInstance() {
+ if (instance == null) {
+ instance = new SharedRateLimiterFactory();
+
+ Timer timer = new Timer("SharedRateLimiterFactory update/report polling");
+
+ // Update periodically
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ instance.update();
+ }
+ }, UPDATE_RATE, UPDATE_RATE);
+
+ // Report periodically
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ instance.report();
+ }
+ }, REPORT_RATE, REPORT_RATE);
+ }
+ return instance;
+ }
+
+ /**
+ * A callback which provides the current rate for a {@link RateLimiter}.
+ */
+ public static interface RateProvider {
+ /**
+ * Calculate the current rate for the {@link RateLimiter}.
+ *
+ * @return Count of permits which should be provided per second. A nonpositive count is taken to indicate that no rate limiting should be performed.
+ */
+ public long getDesiredRate();
+ }
+
+ /**
+ * Lookup the RateLimiter associated with the specified name, or create a new one for that name.
+ *
+ * @param name
+ * key for the rate limiter
+ * @param rateProvider
+ * a function which can be called to get what the current rate for the rate limiter should be.
+ */
+ public RateLimiter create(String name, RateProvider rateProvider) {
+ synchronized (activeLimiters) {
+ if (activeLimiters.containsKey(name)) {
+ SharedRateLimiter limiter = activeLimiters.get(name);
+ return limiter;
+ } else {
+ long initialRate;
+ initialRate = rateProvider.getDesiredRate();
+ SharedRateLimiter limiter = new SharedRateLimiter(name, rateProvider, initialRate);
+ activeLimiters.put(name, limiter);
+ return limiter;
+ }
+ }
+ }
+
+ /**
+ * Walk through all of the currently active RateLimiters, having each update its current rate. This is called periodically so that we can dynamically update
+ * as configuration changes.
+ */
+ protected void update() {
+ Map<String,SharedRateLimiter> limitersCopy;
+ synchronized (activeLimiters) {
+ limitersCopy = ImmutableMap.copyOf(activeLimiters);
+ }
+ for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
+ try {
+ entry.getValue().update();
+ } catch (Exception ex) {
+ log.error(String.format("Failed to update limiter %s", entry.getKey()), ex);
+ }
+ }
+ }
+
+ /** Walk through all of the currently active RateLimiters, having each report its activity to the debug log. */
+ protected void report() {
+ Map<String,SharedRateLimiter> limitersCopy;
+ synchronized (activeLimiters) {
+ limitersCopy = ImmutableMap.copyOf(activeLimiters);
+ }
+ for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
+ try {
+ entry.getValue().report();
+ } catch (Exception ex) {
+ log.error(String.format("Failed to report limiter %s", entry.getKey()), ex);
+ }
+ }
+ }
+
+ protected class SharedRateLimiter extends GuavaRateLimiter {
+ private volatile long permitsAcquired = 0;
+ private volatile long lastUpdate;
+
+ private final RateProvider rateProvider;
+ private final String name;
+
+ SharedRateLimiter(String name, RateProvider rateProvider, long initialRate) {
+ super(initialRate);
+ this.name = name;
+ this.rateProvider = rateProvider;
+ this.lastUpdate = System.currentTimeMillis();
+ }
+
+ @Override
+ public void acquire(long permits) {
+ super.acquire(permits);
+ permitsAcquired += permits;
+ }
+
+ /** Poll the callback, updating the current rate if necessary. */
+ public void update() {
+ // Reset rate if needed
+ long rate = rateProvider.getDesiredRate();
+ if (rate != getRate()) {
+ setRate(rate);
+ }
+ }
+
+ /** Report the current throughput and usage of this rate limiter to the debug log. */
+ public void report() {
+ if (log.isDebugEnabled()) {
+ long duration = System.currentTimeMillis() - lastUpdate;
+ if (duration == 0)
+ return;
+ lastUpdate = System.currentTimeMillis();
+
+ long sum = permitsAcquired;
+ permitsAcquired = 0;
+
+ if (sum > 0) {
+ log.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / duration, getRate()));
+ }
+ }
+ }
+ }
+}