You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:19:22 UTC
svn commit: r1181554 [2/2] - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/io/
main/java/org/apache/hadoop/hbase/io/hfile/
test/java/org/apache/hadoop/hbase/io/hfile/
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java?rev=1181554&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java Tue Oct 11 02:19:21 2011
@@ -0,0 +1,482 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.compress.Compressor;
+
+import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHFileBlock {
+
+ private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
+
+ private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
+
+ static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
+ NONE, GZ };
+
+ // In case we need to temporarily switch some test cases to just test gzip.
+ static final Compression.Algorithm[] GZIP_ONLY = { GZ };
+
+ private static final int NUM_TEST_BLOCKS = 1000;
+
+ private static final int NUM_READER_THREADS = 26;
+
+ private static final HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+ private FileSystem fs;
+ private int uncompressedSizeV1;
+
+ @Before
+ public void setUp() throws IOException {
+ fs = FileSystem.get(TEST_UTIL.getConfiguration());
+ TEST_UTIL.initTestDir();
+ }
+
+ public void writeTestBlockContents(DataOutputStream dos) throws IOException {
+ // This compresses really well.
+ for (int i = 0; i < 1000; ++i)
+ dos.writeInt(i / 100);
+ }
+
+ public byte[] createTestV1Block(Compression.Algorithm algo)
+ throws IOException {
+ Compressor compressor = algo.getCompressor();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputStream os = algo.createCompressionStream(baos, compressor, 0);
+ DataOutputStream dos = new DataOutputStream(os);
+ BlockType.META.write(dos); // Let's make this a meta block.
+ writeTestBlockContents(dos);
+ uncompressedSizeV1 = dos.size();
+ dos.flush();
+ algo.returnCompressor(compressor);
+ return baos.toByteArray();
+ }
+
+ private byte[] createTestV2Block(Compression.Algorithm algo)
+ throws IOException {
+ final BlockType blockType = BlockType.DATA;
+ HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
+ DataOutputStream dos = hbw.startWriting(blockType, false);
+ writeTestBlockContents(dos);
+ byte[] headerAndData = hbw.getHeaderAndData();
+ assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
+ hbw.releaseCompressor();
+ return headerAndData;
+ }
+
+ public String createTestBlockStr(Compression.Algorithm algo)
+ throws IOException {
+ return Bytes.toStringBinary(createTestV2Block(algo));
+ }
+
+ @Test
+ public void testNoCompression() throws IOException {
+ assertEquals(4000 + HFileBlock.HEADER_SIZE, createTestV2Block(NONE).length);
+ }
+
+ @Test
+ public void testGzipCompression() throws IOException {
+ assertEquals(
+ "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
+ + "\\xFF\\xFF\\xFF\\xFF\\x1F\\x8B\\x08\\x00\\x00\\x00\\x00\\x00"
+ + "\\x00\\x00\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
+ + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
+ + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00",
+ createTestBlockStr(GZ));
+ }
+
+ @Test
+ public void testReaderV1() throws IOException {
+ for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+ for (boolean pread : new boolean[] { false, true }) {
+ byte[] block = createTestV1Block(algo);
+ Path path = new Path(HBaseTestingUtility.getTestDir(), "blocks_v1_"
+ + algo);
+ LOG.info("Creating temporary file at " + path);
+ FSDataOutputStream os = fs.create(path);
+ int totalSize = 0;
+ int numBlocks = 50;
+ for (int i = 0; i < numBlocks; ++i) {
+ os.write(block);
+ totalSize += block.length;
+ }
+ os.close();
+
+ FSDataInputStream is = fs.open(path);
+ HFileBlock.FSReader hbr = new HFileBlock.FSReaderV1(is, algo,
+ totalSize);
+ HFileBlock b;
+ int numBlocksRead = 0;
+ long pos = 0;
+ while (pos < totalSize) {
+ b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread);
+ b.sanityCheck();
+ pos += block.length;
+ numBlocksRead++;
+ }
+ assertEquals(numBlocks, numBlocksRead);
+ is.close();
+ }
+ }
+ }
+
+ @Test
+ public void testReaderV2() throws IOException {
+ for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+ for (boolean pread : new boolean[] { false, true }) {
+ Path path = new Path(HBaseTestingUtility.getTestDir(), "blocks_v2_"
+ + algo);
+ FSDataOutputStream os = fs.create(path);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
+ long totalSize = 0;
+ for (int blockId = 0; blockId < 2; ++blockId) {
+ DataOutputStream dos = hbw.startWriting(BlockType.DATA, false);
+ for (int i = 0; i < 1234; ++i)
+ dos.writeInt(i);
+ hbw.writeHeaderAndData(os);
+ totalSize += hbw.getOnDiskSizeWithHeader();
+ }
+ os.close();
+
+ FSDataInputStream is = fs.open(path);
+ HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo,
+ totalSize);
+ HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
+ is.close();
+
+ b.sanityCheck();
+ assertEquals(4936, b.getUncompressedSizeWithoutHeader());
+ assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader());
+ String blockStr = b.toString();
+
+ if (algo == GZ) {
+ is = fs.open(path);
+ hbr = new HFileBlock.FSReaderV2(is, algo, totalSize);
+ b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE, -1, pread);
+ assertEquals(blockStr, b.toString());
+ int wrongCompressedSize = 2172;
+ try {
+ b = hbr.readBlockData(0, wrongCompressedSize
+ + HFileBlock.HEADER_SIZE, -1, pread);
+ fail("Exception expected");
+ } catch (IOException ex) {
+ String expectedPrefix = "On-disk size without header provided is "
+ + wrongCompressedSize + ", but block header contains "
+ + b.getOnDiskSizeWithoutHeader() + ".";
+ assertTrue("Invalid exception message: '" + ex.getMessage()
+ + "'.\nMessage is expected to start with: '" + expectedPrefix
+ + "'", ex.getMessage().startsWith(expectedPrefix));
+ }
+ is.close();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testPreviousOffset() throws IOException {
+ for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+ for (boolean pread : BOOLEAN_VALUES) {
+ for (boolean cacheOnWrite : BOOLEAN_VALUES) {
+ Random rand = defaultRandom();
+ LOG.info("Compression algorithm: " + algo + ", pread=" + pread);
+ Path path = new Path(HBaseTestingUtility.getTestDir(), "prev_offset");
+ List<Long> expectedOffsets = new ArrayList<Long>();
+ List<Long> expectedPrevOffsets = new ArrayList<Long>();
+ List<BlockType> expectedTypes = new ArrayList<BlockType>();
+ List<ByteBuffer> expectedContents = cacheOnWrite
+ ? new ArrayList<ByteBuffer>() : null;
+ long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
+ expectedPrevOffsets, expectedTypes, expectedContents, true);
+
+ FSDataInputStream is = fs.open(path);
+ HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo,
+ totalSize);
+ long curOffset = 0;
+ for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
+ if (!pread) {
+ assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
+ HFileBlock.HEADER_SIZE));
+ }
+
+ assertEquals(expectedOffsets.get(i).longValue(), curOffset);
+
+ LOG.info("Reading block #" + i + " at offset " + curOffset);
+ HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
+ LOG.info("Block #" + i + ": " + b);
+ assertEquals("Invalid block #" + i + "'s type:",
+ expectedTypes.get(i), b.getBlockType());
+ assertEquals("Invalid previous block offset for block " + i
+ + " of " + "type " + b.getBlockType() + ":",
+ (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
+ b.sanityCheck();
+ assertEquals(curOffset, b.getOffset());
+
+ // Now re-load this block knowing the on-disk size. This tests a
+ // different branch in the loader.
+ HFileBlock b2 = hbr.readBlockData(curOffset,
+ b.getOnDiskSizeWithHeader(), -1, pread);
+ b2.sanityCheck();
+
+ assertEquals(b.getBlockType(), b2.getBlockType());
+ assertEquals(b.getOnDiskSizeWithoutHeader(),
+ b2.getOnDiskSizeWithoutHeader());
+ assertEquals(b.getOnDiskSizeWithHeader(),
+ b2.getOnDiskSizeWithHeader());
+ assertEquals(b.getUncompressedSizeWithoutHeader(),
+ b2.getUncompressedSizeWithoutHeader());
+ assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
+ assertEquals(curOffset, b2.getOffset());
+
+ curOffset += b.getOnDiskSizeWithHeader();
+
+ if (cacheOnWrite) {
+ // In the cache-on-write mode we store uncompressed bytes so we
+ // can compare them to what was read by the block reader.
+
+ ByteBuffer bufRead = b.getBufferWithHeader();
+ ByteBuffer bufExpected = expectedContents.get(i);
+ boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
+ bufRead.arrayOffset(), bufRead.limit(),
+ bufExpected.array(), bufExpected.arrayOffset(),
+ bufExpected.limit()) == 0;
+ String wrongBytesMsg = "";
+
+ if (!bytesAreCorrect) {
+ // Optimization: only construct an error message in case we
+ // will need it.
+ wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
+ + algo + ", pread=" + pread + "):\n";
+ wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
+ bufExpected.arrayOffset(), Math.min(32,
+ bufExpected.limit()))
+ + ", actual:\n"
+ + Bytes.toStringBinary(bufRead.array(),
+ bufRead.arrayOffset(), Math.min(32, bufRead.limit()));
+ }
+
+ assertTrue(wrongBytesMsg, bytesAreCorrect);
+ }
+ }
+
+ assertEquals(curOffset, fs.getFileStatus(path).getLen());
+ is.close();
+ }
+ }
+ }
+ }
+
+ private Random defaultRandom() {
+ return new Random(189237);
+ }
+
+ private class BlockReaderThread implements Callable<Boolean> {
+ private final String clientId;
+ private final HFileBlock.FSReader hbr;
+ private final List<Long> offsets;
+ private final List<BlockType> types;
+ private final long fileSize;
+
+ public BlockReaderThread(String clientId,
+ HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
+ long fileSize) {
+ this.clientId = clientId;
+ this.offsets = offsets;
+ this.hbr = hbr;
+ this.types = types;
+ this.fileSize = fileSize;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ Random rand = new Random(clientId.hashCode());
+ long endTime = System.currentTimeMillis() + 10000;
+ int numBlocksRead = 0;
+ int numPositionalRead = 0;
+ int numWithOnDiskSize = 0;
+ while (System.currentTimeMillis() < endTime) {
+ int blockId = rand.nextInt(NUM_TEST_BLOCKS);
+ long offset = offsets.get(blockId);
+ boolean pread = rand.nextBoolean();
+ boolean withOnDiskSize = rand.nextBoolean();
+ long expectedSize =
+ (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
+ : offsets.get(blockId + 1)) - offset;
+
+ HFileBlock b;
+ try {
+ long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
+ b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
+ } catch (IOException ex) {
+ LOG.error("Error in client " + clientId + " trying to read block at "
+ + offset + ", pread=" + pread + ", withOnDiskSize=" +
+ withOnDiskSize, ex);
+ return false;
+ }
+
+ assertEquals(types.get(blockId), b.getBlockType());
+ assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
+ assertEquals(offset, b.getOffset());
+
+ ++numBlocksRead;
+ if (pread)
+ ++numPositionalRead;
+ if (withOnDiskSize)
+ ++numWithOnDiskSize;
+ }
+ LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
+ " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
+ "specified: " + numWithOnDiskSize + ")");
+ return true;
+ }
+
+ }
+
+ @Test
+ public void testConcurrentReading() throws Exception {
+ for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
+ Path path =
+ new Path(HBaseTestingUtility.getTestDir(), "concurrent_reading");
+ Random rand = defaultRandom();
+ List<Long> offsets = new ArrayList<Long>();
+ List<BlockType> types = new ArrayList<BlockType>();
+ writeBlocks(rand, compressAlgo, path, offsets, null, types, null, false);
+ FSDataInputStream is = fs.open(path);
+ long fileSize = fs.getFileStatus(path).getLen();
+ HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, compressAlgo,
+ fileSize);
+
+ Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
+ ExecutorCompletionService<Boolean> ecs =
+ new ExecutorCompletionService<Boolean>(exec);
+
+ for (int i = 0; i < NUM_READER_THREADS; ++i) {
+ ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
+ offsets, types, fileSize));
+ }
+
+ for (int i = 0; i < NUM_READER_THREADS; ++i) {
+ Future<Boolean> result = ecs.take();
+ assertTrue(result.get());
+ LOG.info(String.valueOf(i + 1)
+ + " reader threads finished successfully (algo=" + compressAlgo
+ + ")");
+ }
+
+ is.close();
+ }
+ }
+
+ private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
+ Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
+ List<BlockType> expectedTypes, List<ByteBuffer> expectedContents,
+ boolean detailedLogging) throws IOException {
+ boolean cacheOnWrite = expectedContents != null;
+ FSDataOutputStream os = fs.create(path);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo);
+ Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
+ long totalSize = 0;
+ for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
+ int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
+ BlockType bt = BlockType.values()[blockTypeOrdinal];
+ DataOutputStream dos = hbw.startWriting(bt, cacheOnWrite);
+ for (int j = 0; j < rand.nextInt(500); ++j) {
+ // This might compress well.
+ dos.writeShort(i + 1);
+ dos.writeInt(j + 1);
+ }
+
+ if (expectedOffsets != null)
+ expectedOffsets.add(os.getPos());
+
+ if (expectedPrevOffsets != null) {
+ Long prevOffset = prevOffsetByType.get(bt);
+ expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
+ prevOffsetByType.put(bt, os.getPos());
+ }
+
+ expectedTypes.add(bt);
+
+ hbw.writeHeaderAndData(os);
+ totalSize += hbw.getOnDiskSizeWithHeader();
+
+ if (cacheOnWrite)
+ expectedContents.add(hbw.getUncompressedBufferWithHeader());
+
+ if (detailedLogging) {
+ LOG.info("Writing block #" + i + " of type " + bt
+ + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
+ + " at offset " + os.getPos());
+ }
+ }
+ os.close();
+ LOG.info("Created a temporary file at " + path + ", "
+ + fs.getFileStatus(path).getLen() + " byte, compression=" +
+ compressAlgo);
+ return totalSize;
+ }
+
+ @Test
+ public void testBlockHeapSize() {
+ for (int size : new int[] { 100, 256, 12345 }) {
+ byte[] byteArr = new byte[HFileBlock.HEADER_SIZE + size];
+ ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
+ HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
+ true, -1);
+ assertEquals(80, HFileBlock.BYTE_BUFFER_HEAP_SIZE);
+ long expected = ClassSize.align(ClassSize.estimateBase(HFileBlock.class,
+ true)
+ + ClassSize.estimateBase(buf.getClass(), true)
+ + HFileBlock.HEADER_SIZE + size);
+ assertEquals(expected, block.heapSize());
+ }
+ }
+
+}
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java?rev=1181554&r1=1181553&r2=1181554&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java Tue Oct 11 02:19:21 2011
@@ -19,7 +19,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;
-import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -43,11 +42,11 @@ public class TestLruBlockCache extends T
LruBlockCache cache = new LruBlockCache(maxSize,blockSize);
- Block [] blocks = generateFixedBlocks(10, blockSize, "block");
+ CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
// Add all the blocks
- for(Block block : blocks) {
- cache.cacheBlock(block.blockName, block.buf);
+ for (CachedItem block : blocks) {
+ cache.cacheBlock(block.blockName, block);
}
// Let the eviction run
@@ -70,35 +69,35 @@ public class TestLruBlockCache extends T
LruBlockCache cache = new LruBlockCache(maxSize, blockSize);
- Block [] blocks = generateRandomBlocks(100, blockSize);
+ CachedItem [] blocks = generateRandomBlocks(100, blockSize);
long expectedCacheSize = cache.heapSize();
// Confirm empty
- for(Block block : blocks) {
+ for (CachedItem block : blocks) {
assertTrue(cache.getBlock(block.blockName) == null);
}
// Add blocks
- for(Block block : blocks) {
- cache.cacheBlock(block.blockName, block.buf);
- expectedCacheSize += block.heapSize();
+ for (CachedItem block : blocks) {
+ cache.cacheBlock(block.blockName, block);
+ expectedCacheSize += block.cacheBlockHeapSize();
}
// Verify correctly calculated cache heap size
assertEquals(expectedCacheSize, cache.heapSize());
// Check if all blocks are properly cached and retrieved
- for(Block block : blocks) {
- ByteBuffer buf = cache.getBlock(block.blockName);
+ for (CachedItem block : blocks) {
+ HeapSize buf = cache.getBlock(block.blockName);
assertTrue(buf != null);
- assertEquals(buf.capacity(), block.buf.capacity());
+ assertEquals(buf.heapSize(), block.heapSize());
}
// Re-add same blocks and ensure nothing has changed
- for(Block block : blocks) {
+ for (CachedItem block : blocks) {
try {
- cache.cacheBlock(block.blockName, block.buf);
+ cache.cacheBlock(block.blockName, block);
assertTrue("Cache should not allow re-caching a block", false);
} catch(RuntimeException re) {
// expected
@@ -109,10 +108,10 @@ public class TestLruBlockCache extends T
assertEquals(expectedCacheSize, cache.heapSize());
// Check if all blocks are properly cached and retrieved
- for(Block block : blocks) {
- ByteBuffer buf = cache.getBlock(block.blockName);
+ for (CachedItem block : blocks) {
+ HeapSize buf = cache.getBlock(block.blockName);
assertTrue(buf != null);
- assertEquals(buf.capacity(), block.buf.capacity());
+ assertEquals(buf.heapSize(), block.heapSize());
}
// Expect no evictions
@@ -126,14 +125,14 @@ public class TestLruBlockCache extends T
LruBlockCache cache = new LruBlockCache(maxSize,blockSize,false);
- Block [] blocks = generateFixedBlocks(10, blockSize, "block");
+ CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
long expectedCacheSize = cache.heapSize();
// Add all the blocks
- for(Block block : blocks) {
- cache.cacheBlock(block.blockName, block.buf);
- expectedCacheSize += block.heapSize();
+ for (CachedItem block : blocks) {
+ cache.cacheBlock(block.blockName, block);
+ expectedCacheSize += block.cacheBlockHeapSize();
}
// A single eviction run should have occurred
@@ -155,7 +154,7 @@ public class TestLruBlockCache extends T
assertTrue(cache.getBlock(blocks[1].blockName) == null);
for(int i=2;i<blocks.length;i++) {
assertEquals(cache.getBlock(blocks[i].blockName),
- blocks[i].buf);
+ blocks[i]);
}
}
@@ -166,21 +165,21 @@ public class TestLruBlockCache extends T
LruBlockCache cache = new LruBlockCache(maxSize,blockSize,false);
- Block [] singleBlocks = generateFixedBlocks(5, 10000, "single");
- Block [] multiBlocks = generateFixedBlocks(5, 10000, "multi");
+ CachedItem [] singleBlocks = generateFixedBlocks(5, 10000, "single");
+ CachedItem [] multiBlocks = generateFixedBlocks(5, 10000, "multi");
long expectedCacheSize = cache.heapSize();
// Add and get the multi blocks
- for(Block block : multiBlocks) {
- cache.cacheBlock(block.blockName, block.buf);
- expectedCacheSize += block.heapSize();
- assertEquals(cache.getBlock(block.blockName), block.buf);
+ for (CachedItem block : multiBlocks) {
+ cache.cacheBlock(block.blockName, block);
+ expectedCacheSize += block.cacheBlockHeapSize();
+ assertEquals(cache.getBlock(block.blockName), block);
}
// Add the single blocks (no get)
- for(Block block : singleBlocks) {
- cache.cacheBlock(block.blockName, block.buf);
+ for (CachedItem block : singleBlocks) {
+ cache.cacheBlock(block.blockName, block);
expectedCacheSize += block.heapSize();
}
@@ -211,9 +210,9 @@ public class TestLruBlockCache extends T
// And all others to be cached
for(int i=1;i<4;i++) {
assertEquals(cache.getBlock(singleBlocks[i].blockName),
- singleBlocks[i].buf);
+ singleBlocks[i]);
assertEquals(cache.getBlock(multiBlocks[i].blockName),
- multiBlocks[i].buf);
+ multiBlocks[i]);
}
}
@@ -233,9 +232,9 @@ public class TestLruBlockCache extends T
0.34f);// memory
- Block [] singleBlocks = generateFixedBlocks(5, blockSize, "single");
- Block [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
- Block [] memoryBlocks = generateFixedBlocks(5, blockSize, "memory");
+ CachedItem [] singleBlocks = generateFixedBlocks(5, blockSize, "single");
+ CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
+ CachedItem [] memoryBlocks = generateFixedBlocks(5, blockSize, "memory");
long expectedCacheSize = cache.heapSize();
@@ -243,17 +242,17 @@ public class TestLruBlockCache extends T
for(int i=0;i<3;i++) {
// Just add single blocks
- cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
- expectedCacheSize += singleBlocks[i].heapSize();
+ cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
+ expectedCacheSize += singleBlocks[i].cacheBlockHeapSize();
// Add and get multi blocks
- cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i].buf);
- expectedCacheSize += multiBlocks[i].heapSize();
+ cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i]);
+ expectedCacheSize += multiBlocks[i].cacheBlockHeapSize();
cache.getBlock(multiBlocks[i].blockName);
// Add memory blocks as such
- cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i].buf, true);
- expectedCacheSize += memoryBlocks[i].heapSize();
+ cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i], true);
+ expectedCacheSize += memoryBlocks[i].cacheBlockHeapSize();
}
@@ -264,7 +263,7 @@ public class TestLruBlockCache extends T
assertEquals(expectedCacheSize, cache.heapSize());
// Insert a single block, oldest single should be evicted
- cache.cacheBlock(singleBlocks[3].blockName, singleBlocks[3].buf);
+ cache.cacheBlock(singleBlocks[3].blockName, singleBlocks[3]);
// Single eviction, one thing evicted
assertEquals(1, cache.getEvictionCount());
@@ -277,7 +276,7 @@ public class TestLruBlockCache extends T
cache.getBlock(singleBlocks[1].blockName);
// Insert another single block
- cache.cacheBlock(singleBlocks[4].blockName, singleBlocks[4].buf);
+ cache.cacheBlock(singleBlocks[4].blockName, singleBlocks[4]);
// Two evictions, two evicted.
assertEquals(2, cache.getEvictionCount());
@@ -287,7 +286,7 @@ public class TestLruBlockCache extends T
assertEquals(null, cache.getBlock(multiBlocks[0].blockName));
// Insert another memory block
- cache.cacheBlock(memoryBlocks[3].blockName, memoryBlocks[3].buf, true);
+ cache.cacheBlock(memoryBlocks[3].blockName, memoryBlocks[3], true);
// Three evictions, three evicted.
assertEquals(3, cache.getEvictionCount());
@@ -297,8 +296,8 @@ public class TestLruBlockCache extends T
assertEquals(null, cache.getBlock(memoryBlocks[0].blockName));
// Add a block that is twice as big (should force two evictions)
- Block [] bigBlocks = generateFixedBlocks(3, blockSize*3, "big");
- cache.cacheBlock(bigBlocks[0].blockName, bigBlocks[0].buf);
+ CachedItem [] bigBlocks = generateFixedBlocks(3, blockSize*3, "big");
+ cache.cacheBlock(bigBlocks[0].blockName, bigBlocks[0]);
// Four evictions, six evicted (inserted block 3X size, expect +3 evicted)
assertEquals(4, cache.getEvictionCount());
@@ -313,7 +312,7 @@ public class TestLruBlockCache extends T
cache.getBlock(bigBlocks[0].blockName);
// Cache another single big block
- cache.cacheBlock(bigBlocks[1].blockName, bigBlocks[1].buf);
+ cache.cacheBlock(bigBlocks[1].blockName, bigBlocks[1]);
// Five evictions, nine evicted (3 new)
assertEquals(5, cache.getEvictionCount());
@@ -325,7 +324,7 @@ public class TestLruBlockCache extends T
assertEquals(null, cache.getBlock(multiBlocks[2].blockName));
// Cache a big memory block
- cache.cacheBlock(bigBlocks[2].blockName, bigBlocks[2].buf, true);
+ cache.cacheBlock(bigBlocks[2].blockName, bigBlocks[2], true);
// Six evictions, twelve evicted (3 new)
assertEquals(6, cache.getEvictionCount());
@@ -355,18 +354,18 @@ public class TestLruBlockCache extends T
0.33f, // multi
0.34f);// memory
- Block [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
- Block [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
+ CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
+ CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
// Add 5 multi blocks
- for(Block block : multiBlocks) {
- cache.cacheBlock(block.blockName, block.buf);
+ for (CachedItem block : multiBlocks) {
+ cache.cacheBlock(block.blockName, block);
cache.getBlock(block.blockName);
}
// Add 5 single blocks
for(int i=0;i<5;i++) {
- cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
+ cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
}
// An eviction ran
@@ -389,7 +388,7 @@ public class TestLruBlockCache extends T
// 12 more evicted.
for(int i=5;i<18;i++) {
- cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
+ cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
}
// 4 total evictions, 16 total evicted
@@ -417,22 +416,22 @@ public class TestLruBlockCache extends T
0.33f, // multi
0.34f);// memory
- Block [] singleBlocks = generateFixedBlocks(10, blockSize, "single");
- Block [] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
- Block [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
+ CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single");
+ CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
+ CachedItem [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
// Add all blocks from all priorities
for(int i=0;i<10;i++) {
// Just add single blocks
- cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
+ cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
// Add and get multi blocks
- cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i].buf);
+ cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i]);
cache.getBlock(multiBlocks[i].blockName);
// Add memory blocks as such
- cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i].buf, true);
+ cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i], true);
}
// Do not expect any evictions yet
@@ -456,29 +455,29 @@ public class TestLruBlockCache extends T
// And the newest 5 blocks should still be accessible
for(int i=5;i<10;i++) {
- assertEquals(singleBlocks[i].buf, cache.getBlock(singleBlocks[i].blockName));
- assertEquals(multiBlocks[i].buf, cache.getBlock(multiBlocks[i].blockName));
- assertEquals(memoryBlocks[i].buf, cache.getBlock(memoryBlocks[i].blockName));
+ assertEquals(singleBlocks[i], cache.getBlock(singleBlocks[i].blockName));
+ assertEquals(multiBlocks[i], cache.getBlock(multiBlocks[i].blockName));
+ assertEquals(memoryBlocks[i], cache.getBlock(memoryBlocks[i].blockName));
}
}
- private Block [] generateFixedBlocks(int numBlocks, int size, String pfx) {
- Block [] blocks = new Block[numBlocks];
+ private CachedItem [] generateFixedBlocks(int numBlocks, int size, String pfx) {
+ CachedItem [] blocks = new CachedItem[numBlocks];
for(int i=0;i<numBlocks;i++) {
- blocks[i] = new Block(pfx + i, size);
+ blocks[i] = new CachedItem(pfx + i, size);
}
return blocks;
}
- private Block [] generateFixedBlocks(int numBlocks, long size, String pfx) {
+ private CachedItem [] generateFixedBlocks(int numBlocks, long size, String pfx) {
return generateFixedBlocks(numBlocks, (int)size, pfx);
}
- private Block [] generateRandomBlocks(int numBlocks, long maxSize) {
- Block [] blocks = new Block[numBlocks];
+ private CachedItem [] generateRandomBlocks(int numBlocks, long maxSize) {
+ CachedItem [] blocks = new CachedItem[numBlocks];
Random r = new Random();
for(int i=0;i<numBlocks;i++) {
- blocks[i] = new Block("block" + i, r.nextInt((int)maxSize)+1);
+ blocks[i] = new CachedItem("block" + i, r.nextInt((int)maxSize)+1);
}
return blocks;
}
@@ -508,19 +507,26 @@ public class TestLruBlockCache extends T
LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
}
- private static class Block implements HeapSize {
+ private static class CachedItem implements HeapSize {
String blockName;
- ByteBuffer buf;
+ int size;
- Block(String blockName, int size) {
+ CachedItem(String blockName, int size) {
this.blockName = blockName;
- this.buf = ByteBuffer.allocate(size);
+ this.size = size;
}
+ /** The size of this item reported to the block cache layer */
+ @Override
public long heapSize() {
- return CachedBlock.PER_BLOCK_OVERHEAD +
- ClassSize.align(blockName.length()) +
- ClassSize.align(buf.capacity());
+ return ClassSize.align(size);
+ }
+
+ /** Size of the cache block holding this item. Used for verification. */
+ public long cacheBlockHeapSize() {
+ return CachedBlock.PER_BLOCK_OVERHEAD
+ + ClassSize.align(blockName.length())
+ + ClassSize.align(size);
}
}
}