You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/11/30 20:53:18 UTC

[accumulo] branch 1.8 updated: ACCUMULO-4744 Fixed RFile API scanner bug (#324)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.8
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.8 by this push:
     new ed313f7  ACCUMULO-4744 Fixed RFile API scanner bug (#324)
ed313f7 is described below

commit ed313f7703b49343825be26c31f6eed81bc37d83
Author: Keith Turner <kt...@apache.org>
AuthorDate: Mon Nov 20 16:10:37 2017 -0500

    ACCUMULO-4744 Fixed RFile API scanner bug (#324)
---
 .../accumulo/core/client/rfile/RFileScanner.java   |  3 ++-
 .../file/blockfile/impl/CachableBlockFile.java     | 10 +++++----
 .../accumulo/core/client/rfile/RFileTest.java      | 24 ++++++++++++++++++++++
 .../core/file/rfile/MultiLevelIndexTest.java       |  2 +-
 .../apache/accumulo/core/file/rfile/RFileTest.java |  5 +++--
 5 files changed, 36 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 4dfba68..bc0df82 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -265,7 +265,8 @@ class RFileScanner extends ScannerOptions implements Scanner {
       List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length);
       for (int i = 0; i < sources.length; i++) {
         FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
-        readers.add(new RFile.Reader(new CachableBlockFile.Reader(inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache,
+
+        readers.add(new RFile.Reader(new CachableBlockFile.Reader("source-" + i, inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache,
             AccumuloConfiguration.getDefaultConfiguration())));
       }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 4fa6634..3ecb5ca 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -147,7 +147,7 @@ public class CachableBlockFile {
   public static class Reader implements BlockFileReader {
     private final RateLimiter readLimiter;
     private BCFile.Reader _bc;
-    private String fileName = "not_available";
+    private final String fileName;
     private BlockCache _dCache = null;
     private BlockCache _iCache = null;
     private InputStream fin = null;
@@ -251,16 +251,18 @@ public class CachableBlockFile {
       this.readLimiter = readLimiter;
     }
 
-    public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf, BlockCache data, BlockCache index,
-        AccumuloConfiguration accumuloConfiguration) throws IOException {
+    public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf, BlockCache data,
+        BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this.fileName = cacheId;
       this._dCache = data;
       this._iCache = index;
       this.readLimiter = null;
       init(fsin, len, conf, accumuloConfiguration);
     }
 
-    public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf,
+    public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf,
         AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this.fileName = cacheId;
       this.readLimiter = null;
       init(fsin, len, conf, accumuloConfiguration);
     }
diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
index 4993810..8748d8c 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
@@ -623,4 +624,27 @@ public class RFileTest {
         .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
     return reader;
   }
+
+  @Test
+  public void testMultipleFilesAndCache() throws Exception {
+    SortedMap<Key,Value> testData = createTestData(100, 10, 10);
+    List<String> files = Arrays.asList(createTmpTestFile(), createTmpTestFile(), createTmpTestFile());
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    for (int i = 0; i < files.size(); i++) {
+      try (RFileWriter writer = RFile.newWriter().to(files.get(i)).withFileSystem(localFs).build()) {
+        for (Entry<Key,Value> entry : testData.entrySet()) {
+          if (entry.getKey().hashCode() % files.size() == i) {
+            writer.append(entry.getKey(), entry.getValue());
+          }
+        }
+      }
+    }
+
+    Scanner scanner = RFile.newScanner().from(files.toArray(new String[files.size()])).withFileSystem(localFs).withIndexCache(1000000).withDataCache(10000000)
+        .build();
+    Assert.assertEquals(testData, toMap(scanner));
+    scanner.close();
+  }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 08889c0..7d947eb 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -76,7 +76,7 @@ public class MultiLevelIndexTest extends TestCase {
     byte[] data = baos.toByteArray();
     SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
     FSDataInputStream in = new FSDataInputStream(bais);
-    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance(), aconf);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source1", in, data.length, CachedConfiguration.getInstance(), aconf);
 
     Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8);
     BlockRead rootIn = _cbr.getMetaBlock("root");
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index fc43ef1..5967f85 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -267,7 +267,8 @@ public class RFileTest {
       LruBlockCache indexCache = new LruBlockCache(100000000, 100000);
       LruBlockCache dataCache = new LruBlockCache(100000000, 100000);
 
-      CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf, dataCache, indexCache, AccumuloConfiguration.getDefaultConfiguration());
+      CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in, fileLength, conf, dataCache, indexCache,
+          AccumuloConfiguration.getDefaultConfiguration());
       reader = new RFile.Reader(_cbr);
       if (cfsi)
         iter = new ColumnFamilySkippingIterator(reader);
@@ -1624,7 +1625,7 @@ public class RFileTest {
     SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
     FSDataInputStream in2 = new FSDataInputStream(bais);
     AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
-    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in2, data.length, CachedConfiguration.getInstance(), aconf);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in2, data.length, CachedConfiguration.getInstance(), aconf);
     Reader reader = new RFile.Reader(_cbr);
     checkIndex(reader);
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <co...@accumulo.apache.org>'].