You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/07/16 16:53:59 UTC
svn commit: r1362072 - in /accumulo/trunk/core/src:
main/java/org/apache/accumulo/core/file/blockfile/
main/java/org/apache/accumulo/core/file/blockfile/cache/
main/java/org/apache/accumulo/core/file/blockfile/impl/
main/java/org/apache/accumulo/core/f...
Author: kturner
Date: Mon Jul 16 14:53:58 2012
New Revision: 1362072
URL: http://svn.apache.org/viewvc?rev=1362072&view=rev
Log:
ACCUMULO-473 added transient indexing to cached rfile blocks
Added:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java Mon Jul 16 14:53:58 2012
@@ -34,4 +34,13 @@ public interface ABlockReader extends Da
public void close() throws IOException;
+ /**
+ * An indexable block supports seeking, getting a position, and associating an arbitrary index with the block
+ *
+ * @return
+ */
+ public boolean isIndexable();
+ public void seek(int position);
+ public int getPosition();
+ <T> T getIndex(Class<T> clazz);
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java Mon Jul 16 14:53:58 2012
@@ -33,7 +33,7 @@ public interface BlockCache {
* @param inMemory
* Whether block should be treated as in-memory
*/
- public void cacheBlock(String blockName, byte buf[], boolean inMemory);
+ public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory);
/**
* Add block to cache (defaults to not in-memory).
@@ -43,7 +43,7 @@ public interface BlockCache {
* @param buf
* The block contents wrapped in a ByteBuffer.
*/
- public void cacheBlock(String blockName, byte buf[]);
+ public CacheEntry cacheBlock(String blockName, byte buf[]);
/**
* Fetch block from cache.
@@ -52,10 +52,15 @@ public interface BlockCache {
* Block number to fetch.
* @return Block or null if block is not in the cache.
*/
- public byte[] getBlock(String blockName);
+ public CacheEntry getBlock(String blockName);
/**
* Shutdown the cache.
*/
public void shutdown();
+
+ /**
+ * @return
+ */
+ public long getMaxSize();
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java Mon Jul 16 14:53:58 2012
@@ -16,40 +16,11 @@
*/
package org.apache.accumulo.core.file.blockfile.cache;
-public class CacheEntry {
- private String fName;
- private Long hash;
+public interface CacheEntry {
+ byte[] getBuffer();
- public CacheEntry(String name, Long time) {
- this.hash = time;
- this.fName = name;
- }
+ public Object getIndex();
- @Override
- public boolean equals(Object other) {
- return
-
- ((CacheEntry) other).getName().equals(fName) && ((CacheEntry) other).getHashInfo().equals(hash) && ((CacheEntry) other).getName().equals(fName)
- && ((CacheEntry) other).getHashInfo().equals(hash);
-
- }
-
- @Override
- public int hashCode() {
- return fName.hashCode() + hash.hashCode();
- }
-
- public String getName() {
- return fName;
- }
-
- public Long getHashInfo() {
-
- return this.hash;
- }
-
- public long length() {
- return fName.length() + Long.SIZE;
- }
+ public void setIndex(Object idx);
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java Mon Jul 16 14:53:58 2012
@@ -19,6 +19,7 @@
*/
package org.apache.accumulo.core.file.blockfile.cache;
+
/**
* Represents an entry in the {@link LruBlockCache}.
*
@@ -26,7 +27,7 @@ package org.apache.accumulo.core.file.bl
* Makes the block memory-aware with {@link HeapSize} and Comparable to sort by access time for the LRU. It also takes care of priority by either instantiating
* as in-memory or handling the transition from single to multiple access.
*/
-public class CachedBlock implements HeapSize, Comparable<CachedBlock> {
+public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntry {
public final static long PER_BLOCK_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * SizeConstants.SIZEOF_LONG)
+ ClassSize.STRING + ClassSize.BYTE_BUFFER);
@@ -51,6 +52,7 @@ public class CachedBlock implements Heap
private volatile long accessTime;
private long size;
private BlockPriority priority;
+ private Object index;
public CachedBlock(String blockName, byte buf[], long accessTime) {
this(blockName, buf, accessTime, false);
@@ -88,6 +90,7 @@ public class CachedBlock implements Heap
return this.accessTime < that.accessTime ? 1 : -1;
}
+ @Override
public byte[] getBuffer() {
return this.buf;
}
@@ -99,4 +102,14 @@ public class CachedBlock implements Heap
public BlockPriority getPriority() {
return this.priority;
}
+
+ @Override
+ public Object getIndex() {
+ return index;
+ }
+
+ @Override
+ public void setIndex(Object idx) {
+ this.index = idx;
+ }
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java Mon Jul 16 14:53:58 2012
@@ -247,7 +247,7 @@ public class LruBlockCache implements Bl
* @param inMemory
* if block is in-memory
*/
- public void cacheBlock(String blockName, byte buf[], boolean inMemory) {
+ public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
CachedBlock cb = map.get(blockName);
if (cb != null) {
stats.duplicateReads();
@@ -262,6 +262,8 @@ public class LruBlockCache implements Bl
runEviction();
}
}
+
+ return cb;
}
/**
@@ -275,8 +277,8 @@ public class LruBlockCache implements Bl
* @param buf
* block buffer
*/
- public void cacheBlock(String blockName, byte buf[]) {
- cacheBlock(blockName, buf, false);
+ public CacheEntry cacheBlock(String blockName, byte buf[]) {
+ return cacheBlock(blockName, buf, false);
}
/**
@@ -286,7 +288,8 @@ public class LruBlockCache implements Bl
* block name
* @return buffer of specified block name, or null if not in cache
*/
- public byte[] getBlock(String blockName) {
+
+ public CachedBlock getBlock(String blockName) {
CachedBlock cb = map.get(blockName);
if (cb == null) {
stats.miss();
@@ -294,7 +297,7 @@ public class LruBlockCache implements Bl
}
stats.hit();
cb.access(count.incrementAndGet());
- return cb.getBuffer();
+ return cb;
}
protected long evictBlock(CachedBlock block) {
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java Mon Jul 16 14:53:58 2012
@@ -27,18 +27,45 @@ import java.util.Map;
* Simple one RFile soft reference cache.
*/
public class SimpleBlockCache implements BlockCache {
- private static class Ref extends SoftReference<byte[]> {
+
+ private static class SimpleCacheEntry implements CacheEntry {
+
+ private byte[] buffer;
+ private Object index;
+
+ SimpleCacheEntry(byte[] buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public Object getIndex() {
+ return index;
+ }
+
+ @Override
+ public void setIndex(Object idx) {
+ this.index = idx;
+ }
+
+ }
+
+ private static class Ref extends SoftReference<SimpleCacheEntry> {
public String blockId;
- public Ref(String blockId, byte buf[], ReferenceQueue<byte[]> q) {
- super(buf, q);
+ public Ref(String blockId, SimpleCacheEntry sce, ReferenceQueue<SimpleCacheEntry> q) {
+ super(sce, q);
this.blockId = blockId;
}
}
private Map<String,Ref> cache = new HashMap<String,Ref>();
- private ReferenceQueue<byte[]> q = new ReferenceQueue<byte[]>();
+ private ReferenceQueue<SimpleCacheEntry> q = new ReferenceQueue<SimpleCacheEntry>();
public int dumps = 0;
/**
@@ -64,7 +91,7 @@ public class SimpleBlockCache implements
return cache.size();
}
- public synchronized byte[] getBlock(String blockName) {
+ public synchronized SimpleCacheEntry getBlock(String blockName) {
processQueue(); // clear out some crap.
Ref ref = cache.get(blockName);
if (ref == null)
@@ -72,15 +99,24 @@ public class SimpleBlockCache implements
return ref.get();
}
- public synchronized void cacheBlock(String blockName, byte buf[]) {
- cache.put(blockName, new Ref(blockName, buf, q));
+ public synchronized SimpleCacheEntry cacheBlock(String blockName, byte buf[]) {
+ SimpleCacheEntry sce = new SimpleCacheEntry(buf);
+ cache.put(blockName, new Ref(blockName, sce, q));
+ return sce;
}
- public synchronized void cacheBlock(String blockName, byte buf[], boolean inMemory) {
- cache.put(blockName, new Ref(blockName, buf, q));
+ public synchronized SimpleCacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
+ SimpleCacheEntry sce = new SimpleCacheEntry(buf);
+ cache.put(blockName, new Ref(blockName, sce, q));
+ return sce;
}
public void shutdown() {
// noop
}
+
+ @Override
+ public long getMaxSize() {
+ return Long.MAX_VALUE;
+ }
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java Mon Jul 16 14:53:58 2012
@@ -21,13 +21,14 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.ref.SoftReference;
import org.apache.accumulo.core.file.blockfile.ABlockReader;
import org.apache.accumulo.core.file.blockfile.ABlockWriter;
import org.apache.accumulo.core.file.blockfile.BlockFileReader;
import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
-import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
@@ -142,8 +143,8 @@ public class CachableBlockFile {
public static class Reader implements BlockFileReader {
private BCFile.Reader _bc;
private String fileName = "not_available";
- private LruBlockCache _dCache = null;
- private LruBlockCache _iCache = null;
+ private BlockCache _dCache = null;
+ private BlockCache _iCache = null;
private FSDataInputStream fin = null;
private FileSystem fs;
private Configuration conf;
@@ -224,12 +225,18 @@ public class CachableBlockFile {
*/
fileName = dataFile.toString();
- this._dCache = (LruBlockCache) data;
- this._iCache = (LruBlockCache) index;
+ this._dCache = data;
+ this._iCache = index;
this.fs = fs;
this.conf = conf;
}
+ public Reader(FSDataInputStream fsin, long len, Configuration conf, BlockCache data, BlockCache index) throws IOException {
+ this._dCache = data;
+ this._iCache = index;
+ init(fsin, len, conf);
+ }
+
public Reader(FSDataInputStream fsin, long len, Configuration conf) throws IOException {
// this.fin = fsin;
init(fsin, len, conf);
@@ -255,13 +262,12 @@ public class CachableBlockFile {
public BlockRead getCachedMetaBlock(String blockName) throws IOException {
String _lookup = fileName + "M" + blockName;
- byte b[] = null;
if (_iCache != null) {
- b = _iCache.getBlock(_lookup);
+ CacheEntry cacheEntry = _iCache.getBlock(_lookup);
- if (b != null) {
- return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
+ if (cacheEntry != null) {
+ return new CachedBlockRead(cacheEntry, cacheEntry.getBuffer());
}
}
@@ -287,16 +293,16 @@ public class CachableBlockFile {
}
}
- private BlockRead getBlock(String _lookup, LruBlockCache cache, BlockLoader loader) throws IOException {
+ private BlockRead getBlock(String _lookup, BlockCache cache, BlockLoader loader) throws IOException {
BlockReader _currBlock;
if (cache != null) {
- byte b[] = null;
- b = cache.getBlock(_lookup);
+ CacheEntry cb = null;
+ cb = cache.getBlock(_lookup);
- if (b != null) {
- return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
+ if (cb != null) {
+ return new CachedBlockRead(cb, cb.getBuffer());
}
}
@@ -313,7 +319,7 @@ public class CachableBlockFile {
}
- private BlockRead cacheBlock(String _lookup, LruBlockCache cache, BlockReader _currBlock, String block) throws IOException {
+ private BlockRead cacheBlock(String _lookup, BlockCache cache, BlockReader _currBlock, String block) throws IOException {
if ((cache == null) || (_currBlock.getRawSize() > cache.getMaxSize())) {
return new BlockRead(_currBlock, _currBlock.getRawSize());
@@ -334,13 +340,17 @@ public class CachableBlockFile {
_currBlock.close();
}
+ CacheEntry ce = null;
try {
- cache.cacheBlock(_lookup, b);
+ ce = cache.cacheBlock(_lookup, b);
} catch (Exception e) {
log.warn("Already cached block: " + _lookup, e);
}
- return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
+ if (ce == null)
+ return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
+ else
+ return new CachedBlockRead(ce, ce.getBuffer());
}
}
@@ -399,6 +409,82 @@ public class CachableBlockFile {
}
+ static class SeekableByteArrayInputStream extends ByteArrayInputStream {
+
+ public SeekableByteArrayInputStream(byte[] buf) {
+ super(buf);
+ }
+
+ public SeekableByteArrayInputStream(byte buf[], int offset, int length) {
+ super(buf, offset, length);
+ throw new UnsupportedOperationException("Seek code assumes offset is zero"); // do not need this constructor, documenting that seek will not work
+ // unless offset it kept track of
+ }
+
+ public void seek(int position) {
+ if (pos < 0 || pos >= buf.length)
+ throw new IllegalArgumentException("pos = " + pos + " buf.lenght = " + buf.length);
+ this.pos = position;
+ }
+
+ public int getPosition() {
+ return this.pos;
+ }
+
+ }
+
+ public static class CachedBlockRead extends BlockRead {
+ private SeekableByteArrayInputStream seekableInput;
+ private CacheEntry cb;
+
+ public CachedBlockRead(CacheEntry cb, byte buf[]) {
+ this(new SeekableByteArrayInputStream(buf), buf.length);
+ this.cb = cb;
+ }
+
+ private CachedBlockRead(SeekableByteArrayInputStream seekableInput, long size) {
+ super(seekableInput, size);
+ this.seekableInput = seekableInput;
+ }
+
+ @Override
+ public void seek(int position) {
+ seekableInput.seek(position);
+ }
+
+ @Override
+ public int getPosition() {
+ return seekableInput.getPosition();
+ }
+
+ @Override
+ public boolean isIndexable() {
+ return true;
+ }
+
+ @Override
+ public <T> T getIndex(Class<T> clazz) {
+ T bi = null;
+ synchronized (cb) {
+ @SuppressWarnings("unchecked")
+ SoftReference<T> softRef = (SoftReference<T>) cb.getIndex();
+ if (softRef != null)
+ bi = softRef.get();
+
+ if (bi == null) {
+ try {
+ bi = clazz.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ cb.setIndex(new SoftReference<T>(bi));
+ }
+ }
+
+ return bi;
+ }
+ }
+
/**
*
* Class provides functionality to read one block from the underlying BCFile Since We are caching blocks in the Reader class as bytearrays, this class will
@@ -430,5 +516,25 @@ public class CachableBlockFile {
return this;
}
+ @Override
+ public boolean isIndexable() {
+ return false;
+ }
+
+ @Override
+ public void seek(int position) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getPosition() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T getIndex(Class<T> clazz) {
+ throw new UnsupportedOperationException();
+ }
+
}
}
Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java?rev=1362072&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java Mon Jul 16 14:53:58 2012
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.ABlockReader;
+import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
+
+/**
+ *
+ */
+public class BlockIndex {
+
+ public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
+
+ BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class);
+
+ int accessCount = blockIndex.accessCount.incrementAndGet();
+
+ // 1 is a power of two, but do not care about it
+ if (accessCount >= 2 && isPowerOfTwo(accessCount)) {
+ blockIndex.buildIndex(accessCount, cacheBlock, indexEntry);
+ }
+
+ if (blockIndex.blockIndex != null)
+ return blockIndex;
+
+ return null;
+ }
+
+ private static boolean isPowerOfTwo(int x) {
+ return ((x > 0) && (x & (x - 1)) == 0);
+ }
+
+ private AtomicInteger accessCount = new AtomicInteger(0);
+ private volatile BlockIndexEntry[] blockIndex = null;
+
+ public static class BlockIndexEntry implements Comparable<BlockIndexEntry> {
+
+ private Key key;
+ private int entriesLeft;
+ private int pos;
+
+ public BlockIndexEntry(int pos, int entriesLeft, Key key) {
+ this.pos = pos;
+ this.entriesLeft = entriesLeft;
+ this.key = key;
+ }
+
+ /**
+ * @param startKey
+ */
+ public BlockIndexEntry(Key key) {
+ this.key = key;
+ }
+
+ public Key getKey() {
+ return key;
+ }
+
+ public int getEntriesLeft() {
+ return entriesLeft;
+ }
+
+ @Override
+ public int compareTo(BlockIndexEntry o) {
+ return key.compareTo(o.key);
+ }
+
+ public String toString() {
+ return key + " " + entriesLeft + " " + pos;
+ }
+ }
+
+ public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
+
+
+ // get a local ref to the index, another thread could change it
+ BlockIndexEntry[] blockIndex = this.blockIndex;
+
+ int pos = Arrays.binarySearch(blockIndex, new BlockIndexEntry(startKey));
+
+ int index;
+
+ if (pos < 0) {
+ if (pos == -1)
+ return null; // less than the first key in index, did not index the first key in block so just return null... code calling this will scan from beginning
+ // of block
+ index = (pos * -1) - 2;
+ } else {
+ // found exact key in index
+ index = pos;
+ }
+
+ // handle case where multiple keys in block are exactly the same, want to find the earliest key in the index
+ while (index - 1 > 0) {
+ if (blockIndex[index].getKey().equals(blockIndex[index - 1].getKey()))
+ index--;
+ else
+ break;
+
+ }
+
+ if (index == 0 && blockIndex[index].getKey().equals(startKey))
+ return null;
+
+ BlockIndexEntry bie = blockIndex[index];
+ cacheBlock.seek(bie.pos);
+ return bie;
+ }
+
+ private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
+ cacheBlock.seek(0);
+
+ RelativeKey rk = new RelativeKey();
+ Value val = new Value();
+
+ int interval = indexEntry.getNumEntries() / indexEntries;
+
+ if (interval <= 32)
+ return;
+
+ // multiple threads could try to create the index with different sizes, do not replace a large index with a smaller one
+ if (this.blockIndex != null && this.blockIndex.length > indexEntries - 1)
+ return;
+
+ int count = 0;
+
+ ArrayList<BlockIndexEntry> index = new ArrayList<BlockIndexEntry>(indexEntries - 1);
+
+ while (count < (indexEntry.getNumEntries() - interval + 1)) {
+
+ int pos = cacheBlock.getPosition();
+ rk.readFields(cacheBlock);
+ val.readFields(cacheBlock);
+
+ if (count > 0 && count % interval == 0) {
+ index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, rk.getKey()));
+ }
+
+ count++;
+ }
+
+ this.blockIndex = index.toArray(new BlockIndexEntry[index.size()]);
+
+ cacheBlock.seek(0);
+ }
+}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java Mon Jul 16 14:53:58 2012
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.file.blo
import org.apache.accumulo.core.file.blockfile.BlockFileReader;
import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
@@ -670,6 +671,12 @@ public class RFile {
if (startKey.compareTo(getTopKey()) >= 0 && startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) {
// start key is within the unconsumed portion of the current block
+ // this code intentionally does not use the index associated with a cached block
+ // because if only forward seeks are being done, then there is no benefit to building
+ // and index for the block... could consider using the index if it exist but not
+ // causing the build of an index... doing this could slow down some use cases and
+ // and speed up others.
+
MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
RelativeKey tmpRk = new RelativeKey();
Key pKey = new Key(getTopKey());
@@ -717,9 +724,35 @@ public class RFile {
entriesLeft = indexEntry.getNumEntries();
currBlock = getDataBlock(indexEntry);
- MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
RelativeKey tmpRk = new RelativeKey();
- fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, prevKey, null);
+ MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
+
+ Key currKey = null;
+
+ if (currBlock.isIndexable()) {
+ BlockIndex blockIndex = BlockIndex.getIndex(currBlock, indexEntry);
+ if (blockIndex != null) {
+ BlockIndexEntry bie = blockIndex.seekBlock(startKey, currBlock);
+ if (bie != null) {
+ // we are seeked to the current position of the key in the index
+ // need to prime the read process and read this key from the block
+ tmpRk.setPrevKey(bie.getKey());
+ tmpRk.readFields(currBlock);
+ val = new Value();
+
+ val.readFields(currBlock);
+ valbs = new MByteSequence(val.get(), 0, val.getSize());
+
+ // just consumed one key from the input stream, so subtract one from entries left
+ entriesLeft = bie.getEntriesLeft() - 1;
+ prevKey = new Key(bie.getKey());
+ currKey = bie.getKey();
+ }
+ }
+
+ }
+
+ fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, prevKey, currKey);
entriesLeft -= fastSkipped;
val = new Value(valbs.toArray());
// set rk when everything above is successful, if exception
@@ -789,7 +822,7 @@ public class RFile {
private AtomicBoolean interruptFlag;
- Reader(BlockFileReader rdr) throws IOException {
+ public Reader(BlockFileReader rdr) throws IOException {
this.reader = rdr;
ABlockReader mb = reader.getMetaBlock("RFile.index");
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java Mon Jul 16 14:53:58 2012
@@ -111,6 +111,10 @@ public class RelativeKey implements Writ
fieldsSame |= DELETED;
}
+ public void setPrevKey(Key pk) {
+ this.prevKey = pk;
+ }
+
@Override
public void readFields(DataInput in) throws IOException {
fieldsSame = in.readByte();
Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java Mon Jul 16 14:53:58 2012
@@ -83,9 +83,9 @@ public class TestLruBlockCache extends T
// Check if all blocks are properly cached and retrieved
for (Block block : blocks) {
- byte buf1[] = cache.getBlock(block.blockName);
- assertTrue(buf1 != null);
- assertEquals(buf1.length, block.buf.length);
+ CacheEntry ce = cache.getBlock(block.blockName);
+ assertTrue(ce != null);
+ assertEquals(ce.getBuffer().length, block.buf.length);
}
// Verify correctly calculated cache heap size
@@ -93,9 +93,9 @@ public class TestLruBlockCache extends T
// Check if all blocks are properly cached and retrieved
for (Block block : blocks) {
- byte buf1[] = cache.getBlock(block.blockName);
- assertTrue(buf1 != null);
- assertEquals(buf1.length, block.buf.length);
+ CacheEntry ce = cache.getBlock(block.blockName);
+ assertTrue(ce != null);
+ assertEquals(ce.getBuffer().length, block.buf.length);
}
// Expect no evictions
@@ -138,7 +138,7 @@ public class TestLruBlockCache extends T
assertTrue(cache.getBlock(blocks[0].blockName) == null);
assertTrue(cache.getBlock(blocks[1].blockName) == null);
for (int i = 2; i < blocks.length; i++) {
- assertEquals(cache.getBlock(blocks[i].blockName), blocks[i].buf);
+ assertEquals(cache.getBlock(blocks[i].blockName).getBuffer(), blocks[i].buf);
}
}
@@ -163,7 +163,7 @@ public class TestLruBlockCache extends T
for (Block block : multiBlocks) {
cache.cacheBlock(block.blockName, block.buf);
expectedCacheSize += block.heapSize();
- assertEquals(cache.getBlock(block.blockName), block.buf);
+ assertEquals(cache.getBlock(block.blockName).getBuffer(), block.buf);
}
// Add the single blocks (no get)
@@ -196,8 +196,8 @@ public class TestLruBlockCache extends T
// And all others to be cached
for (int i = 1; i < 4; i++) {
- assertEquals(cache.getBlock(singleBlocks[i].blockName), singleBlocks[i].buf);
- assertEquals(cache.getBlock(multiBlocks[i].blockName), multiBlocks[i].buf);
+ assertEquals(cache.getBlock(singleBlocks[i].blockName).getBuffer(), singleBlocks[i].buf);
+ assertEquals(cache.getBlock(multiBlocks[i].blockName).getBuffer(), multiBlocks[i].buf);
}
}
@@ -429,9 +429,9 @@ public class TestLruBlockCache extends T
// And the newest 5 blocks should still be accessible
for (int i = 5; i < 10; i++) {
- assertEquals(singleBlocks[i].buf, cache.getBlock(singleBlocks[i].blockName));
- assertEquals(multiBlocks[i].buf, cache.getBlock(multiBlocks[i].blockName));
- assertEquals(memoryBlocks[i].buf, cache.getBlock(memoryBlocks[i].blockName));
+ assertEquals(singleBlocks[i].buf, cache.getBlock(singleBlocks[i].blockName).getBuffer());
+ assertEquals(multiBlocks[i].buf, cache.getBlock(multiBlocks[i].blockName).getBuffer());
+ assertEquals(memoryBlocks[i].buf, cache.getBlock(memoryBlocks[i].blockName).getBuffer());
}
}
Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java Mon Jul 16 14:53:58 2012
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Random;
import java.util.Set;
import junit.framework.TestCase;
@@ -37,6 +38,7 @@ import org.apache.accumulo.core.data.Par
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -177,7 +179,11 @@ public class RFileTest extends TestCase
byte[] data = baos.toByteArray();
bais = new SeekableByteArrayInputStream(data);
in = new FSDataInputStream(bais);
- CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf);
+
+ LruBlockCache indexCache = new LruBlockCache(100000000, 100000);
+ LruBlockCache dataCache = new LruBlockCache(100000000, 100000);
+
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf, dataCache, indexCache);
reader = new RFile.Reader(_cbr);
iter = new ColumnFamilySkippingIterator(reader);
@@ -301,10 +307,10 @@ public class RFileTest extends TestCase
}
}
}
-
+
// trf.writer.append(nk("r1","cf1","cq1","L1", 55), nv("foo"));
trf.closeWriter();
-
+
trf.openReader();
// seek before everything
trf.iter.seek(new Range((Key) null, null), EMPTY_COL_FAMS, false);
@@ -384,6 +390,20 @@ public class RFileTest extends TestCase
assertEquals(expectedKeys.get(expectedKeys.size() - 1), trf.reader.getLastKey());
+ // test seeking to random location and reading all data from that point
+ // there was an off by one bug with this in the transient index
+ Random rand = new Random();
+ for (int i = 0; i < 12; i++) {
+ index = rand.nextInt(expectedKeys.size());
+ trf.seek(expectedKeys.get(index));
+ for (; index < expectedKeys.size(); index++) {
+ assertTrue(trf.iter.hasTop());
+ assertEquals(expectedKeys.get(index), trf.iter.getTopKey());
+ assertEquals(expectedValues.get(index), trf.iter.getTopValue());
+ trf.iter.next();
+ }
+ }
+
trf.closeReader();
}
@@ -1203,7 +1223,7 @@ public class RFileTest extends TestCase
assertFalse(trf.iter.hasTop());
trf.iter.seek(new Range(nk("r0000", "cf1", "cq1", "", 1), false, nk("r0001", "cf1", "cq1", "", 1), true), EMPTY_COL_FAMS, false);
-
+
for (int i = 2048; i < 4096; i++) {
assertTrue(trf.iter.hasTop());
assertEquals(nk("r0001", "cf1", "cq1", "", 1), trf.iter.getTopKey());