You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/11/30 20:53:18 UTC
[accumulo] branch 1.8 updated: ACCUMULO-4744 Fixed RFile API
scanner bug (#324)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1.8
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.8 by this push:
new ed313f7 ACCUMULO-4744 Fixed RFile API scanner bug (#324)
ed313f7 is described below
commit ed313f7703b49343825be26c31f6eed81bc37d83
Author: Keith Turner <kt...@apache.org>
AuthorDate: Mon Nov 20 16:10:37 2017 -0500
ACCUMULO-4744 Fixed RFile API scanner bug (#324)
---
.../accumulo/core/client/rfile/RFileScanner.java | 3 ++-
.../file/blockfile/impl/CachableBlockFile.java | 10 +++++----
.../accumulo/core/client/rfile/RFileTest.java | 24 ++++++++++++++++++++++
.../core/file/rfile/MultiLevelIndexTest.java | 2 +-
.../apache/accumulo/core/file/rfile/RFileTest.java | 5 +++--
5 files changed, 36 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 4dfba68..bc0df82 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -265,7 +265,8 @@ class RFileScanner extends ScannerOptions implements Scanner {
List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length);
for (int i = 0; i < sources.length; i++) {
FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
- readers.add(new RFile.Reader(new CachableBlockFile.Reader(inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache,
+
+ readers.add(new RFile.Reader(new CachableBlockFile.Reader("source-" + i, inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache,
AccumuloConfiguration.getDefaultConfiguration())));
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 4fa6634..3ecb5ca 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -147,7 +147,7 @@ public class CachableBlockFile {
public static class Reader implements BlockFileReader {
private final RateLimiter readLimiter;
private BCFile.Reader _bc;
- private String fileName = "not_available";
+ private final String fileName;
private BlockCache _dCache = null;
private BlockCache _iCache = null;
private InputStream fin = null;
@@ -251,16 +251,18 @@ public class CachableBlockFile {
this.readLimiter = readLimiter;
}
- public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf, BlockCache data, BlockCache index,
- AccumuloConfiguration accumuloConfiguration) throws IOException {
+ public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf, BlockCache data,
+ BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException {
+ this.fileName = cacheId;
this._dCache = data;
this._iCache = index;
this.readLimiter = null;
init(fsin, len, conf, accumuloConfiguration);
}
- public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf,
+ public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf,
AccumuloConfiguration accumuloConfiguration) throws IOException {
+ this.fileName = cacheId;
this.readLimiter = null;
init(fsin, len, conf, accumuloConfiguration);
}
diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
index 4993810..8748d8c 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
@@ -623,4 +624,27 @@ public class RFileTest {
.withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
return reader;
}
+
+ @Test
+ public void testMultipleFilesAndCache() throws Exception {
+ SortedMap<Key,Value> testData = createTestData(100, 10, 10);
+ List<String> files = Arrays.asList(createTmpTestFile(), createTmpTestFile(), createTmpTestFile());
+
+ LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+ for (int i = 0; i < files.size(); i++) {
+ try (RFileWriter writer = RFile.newWriter().to(files.get(i)).withFileSystem(localFs).build()) {
+ for (Entry<Key,Value> entry : testData.entrySet()) {
+ if (entry.getKey().hashCode() % files.size() == i) {
+ writer.append(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+
+ Scanner scanner = RFile.newScanner().from(files.toArray(new String[files.size()])).withFileSystem(localFs).withIndexCache(1000000).withDataCache(10000000)
+ .build();
+ Assert.assertEquals(testData, toMap(scanner));
+ scanner.close();
+ }
}
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 08889c0..7d947eb 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -76,7 +76,7 @@ public class MultiLevelIndexTest extends TestCase {
byte[] data = baos.toByteArray();
SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
FSDataInputStream in = new FSDataInputStream(bais);
- CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance(), aconf);
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source1", in, data.length, CachedConfiguration.getInstance(), aconf);
Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8);
BlockRead rootIn = _cbr.getMetaBlock("root");
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index fc43ef1..5967f85 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -267,7 +267,8 @@ public class RFileTest {
LruBlockCache indexCache = new LruBlockCache(100000000, 100000);
LruBlockCache dataCache = new LruBlockCache(100000000, 100000);
- CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf, dataCache, indexCache, AccumuloConfiguration.getDefaultConfiguration());
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in, fileLength, conf, dataCache, indexCache,
+ AccumuloConfiguration.getDefaultConfiguration());
reader = new RFile.Reader(_cbr);
if (cfsi)
iter = new ColumnFamilySkippingIterator(reader);
@@ -1624,7 +1625,7 @@ public class RFileTest {
SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
FSDataInputStream in2 = new FSDataInputStream(bais);
AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
- CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in2, data.length, CachedConfiguration.getInstance(), aconf);
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in2, data.length, CachedConfiguration.getInstance(), aconf);
Reader reader = new RFile.Reader(_cbr);
checkIndex(reader);
--
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <co...@accumulo.apache.org>'].