You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/01/26 03:59:00 UTC
svn commit: r1236031 [5/7] - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/
main/java/org/apache/hadoop/hbase/io/encoding/
main/java/org/apache/hadoop/hbase/io/hfile/
main/java/org/apache/hadoop/hbase/mapr...
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.TestMiniClusterLoadSequential;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Uses the load tester
+ */
+@Category(MediumTests.class)
+public class TestLoadAndSwitchEncodeOnDisk extends
+ TestMiniClusterLoadSequential {
+
+ /** We do not alternate the multi-put flag in this test. */
+ private static final boolean USE_MULTI_PUT = true;
+
+ /** Un-parameterize the test */
+ @Parameters
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][]{ new Object[0] });
+ }
+
+ public TestLoadAndSwitchEncodeOnDisk() {
+ super(USE_MULTI_PUT, DataBlockEncoding.PREFIX);
+ conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
+ }
+
+ protected int numKeys() {
+ return 3000;
+ }
+
+ @Test(timeout=TIMEOUT_MS)
+ public void loadTest() throws Exception {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+
+ compression = Compression.Algorithm.GZ; // used for table setup
+ super.loadTest();
+
+ HColumnDescriptor hcd = getColumnDesc(admin);
+ System.err.println("\nDisabling encode-on-disk. Old column descriptor: " +
+ hcd + "\n");
+ admin.disableTable(TABLE);
+ hcd.setEncodeOnDisk(false);
+ admin.modifyColumn(TABLE, hcd);
+
+ System.err.println("\nRe-enabling table\n");
+ admin.enableTable(TABLE);
+
+ System.err.println("\nNew column descriptor: " +
+ getColumnDesc(admin) + "\n");
+
+ System.err.println("\nCompacting the table\n");
+ admin.majorCompact(TABLE);
+ // Wait until compaction completes
+ Threads.sleepWithoutInterrupt(5000);
+ HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
+ while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
+ Threads.sleep(50);
+ }
+
+ System.err.println("\nDone with the test, shutting down the cluster\n");
+ }
+
+}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import static org.apache.hadoop.hbase.io.encoding.TestChangingEncoding.CF;
+import static org.apache.hadoop.hbase.io.encoding.TestChangingEncoding.CF_BYTES;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestUpgradeFromHFileV1ToEncoding {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestUpgradeFromHFileV1ToEncoding.class);
+
+ private static final String TABLE = "UpgradeTable";
+ private static final byte[] TABLE_BYTES = Bytes.toBytes(TABLE);
+
+ private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final Configuration conf = TEST_UTIL.getConfiguration();
+
+ private static final int NUM_HFILE_V1_BATCHES = 10;
+ private static final int NUM_HFILE_V2_BATCHES = 20;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Use a small flush size to create more HFiles.
+ conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
+ conf.setInt(HFile.FORMAT_VERSION_KEY, 1); // Use HFile v1 initially
+ TEST_UTIL.startMiniCluster();
+ LOG.debug("Started an HFile v1 cluster");
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testUpgrade() throws Exception {
+ int numBatches = 0;
+ HTableDescriptor htd = new HTableDescriptor(TABLE);
+ HColumnDescriptor hcd = new HColumnDescriptor(CF);
+ htd.addFamily(hcd);
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ admin.createTable(htd);
+ admin.close();
+ for (int i = 0; i < NUM_HFILE_V1_BATCHES; ++i) {
+ TestChangingEncoding.writeTestDataBatch(conf, TABLE, numBatches++);
+ }
+ TEST_UTIL.shutdownMiniHBaseCluster();
+
+ conf.setInt(HFile.FORMAT_VERSION_KEY, 2);
+ TEST_UTIL.startMiniHBaseCluster(1, 1);
+ LOG.debug("Started an HFile v2 cluster");
+ admin = new HBaseAdmin(conf);
+ htd = admin.getTableDescriptor(TABLE_BYTES);
+ hcd = htd.getFamily(CF_BYTES);
+ hcd.setDataBlockEncoding(DataBlockEncoding.PREFIX);
+ admin.disableTable(TABLE);
+ admin.modifyColumn(TABLE, hcd);
+ admin.enableTable(TABLE);
+ admin.close();
+ for (int i = 0; i < NUM_HFILE_V2_BATCHES; ++i) {
+ TestChangingEncoding.writeTestDataBatch(conf, TABLE, numBatches++);
+ }
+
+ LOG.debug("Verifying all 'batches', both HFile v1 and encoded HFile v2");
+ verifyBatches(numBatches);
+
+ LOG.debug("Doing a manual compaction");
+ admin.compact(TABLE);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+ LOG.debug("Verify all the data again");
+ verifyBatches(numBatches);
+ }
+
+ private void verifyBatches(int numBatches) throws Exception {
+ for (int i = 0; i < numBatches; ++i) {
+ TestChangingEncoding.verifyTestDataBatch(conf, TABLE, i);
+ }
+ }
+
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java Thu Jan 26 02:58:57 2012
@@ -41,9 +41,15 @@ import org.apache.hadoop.hbase.regionser
public class CacheTestUtils {
- /*Just checks if heapsize grows when something is cached, and gets smaller when the same object is evicted*/
+ private static final boolean includesMemstoreTS = true;
- public static void testHeapSizeChanges(final BlockCache toBeTested, final int blockSize){
+ /**
+ * Just checks if heapsize grows when something is cached, and gets smaller
+ * when the same object is evicted
+ */
+
+ public static void testHeapSizeChanges(final BlockCache toBeTested,
+ final int blockSize) {
HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
long heapSize = ((HeapSize) toBeTested).heapSize();
toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
@@ -316,7 +322,8 @@ public class CacheTestUtils {
HFileBlock generated = new HFileBlock(BlockType.DATA,
onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
- prevBlockOffset, cachedBuffer, false, blockSize);
+ prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
+ blockSize, includesMemstoreTS);
String strKey;
/* No conflicting keys */
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java Thu Jan 26 02:58:57 2012
@@ -38,10 +38,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
@@ -74,10 +74,13 @@ public class TestCacheOnWrite {
private FileSystem fs;
private Random rand = new Random(12983177L);
private Path storeFilePath;
- private Compression.Algorithm compress;
- private CacheOnWriteType cowType;
private BlockCache blockCache;
- private String testName;
+ private String testDescription;
+
+ private final CacheOnWriteType cowType;
+ private final Compression.Algorithm compress;
+ private final BlockEncoderTestType encoderType;
+ private final HFileDataBlockEncoder encoder;
private static final int DATA_BLOCK_SIZE = 2048;
private static final int NUM_KV = 25000;
@@ -90,49 +93,87 @@ public class TestCacheOnWrite {
KeyValue.Type.values().length - 2;
private static enum CacheOnWriteType {
- DATA_BLOCKS(BlockType.DATA, CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY),
- BLOOM_BLOCKS(BlockType.BLOOM_CHUNK,
- CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY),
- INDEX_BLOCKS(BlockType.LEAF_INDEX,
- CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY);
+ DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
+ BlockType.DATA, BlockType.ENCODED_DATA),
+ BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
+ BlockType.BLOOM_CHUNK),
+ INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
+ BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
private final String confKey;
- private final BlockType inlineBlockType;
+ private final BlockType blockType1;
+ private final BlockType blockType2;
+
+ private CacheOnWriteType(String confKey, BlockType blockType) {
+ this(confKey, blockType, blockType);
+ }
- private CacheOnWriteType(BlockType inlineBlockType, String confKey) {
- this.inlineBlockType = inlineBlockType;
+ private CacheOnWriteType(String confKey, BlockType blockType1,
+ BlockType blockType2) {
+ this.blockType1 = blockType1;
+ this.blockType2 = blockType2;
this.confKey = confKey;
}
public boolean shouldBeCached(BlockType blockType) {
- return blockType == inlineBlockType
- || blockType == BlockType.INTERMEDIATE_INDEX
- && inlineBlockType == BlockType.LEAF_INDEX;
+ return blockType == blockType1 || blockType == blockType2;
}
public void modifyConf(Configuration conf) {
- for (CacheOnWriteType cowType : CacheOnWriteType.values())
+ for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
conf.setBoolean(cowType.confKey, cowType == this);
+ }
}
}
+ private static final DataBlockEncoding ENCODING_ALGO =
+ DataBlockEncoding.PREFIX;
+
+ /** Provides fancy names for three combinations of two booleans */
+ private static enum BlockEncoderTestType {
+ NO_BLOCK_ENCODING(false, false),
+ BLOCK_ENCODING_IN_CACHE_ONLY(false, true),
+ BLOCK_ENCODING_EVERYWHERE(true, true);
+
+ private final boolean encodeOnDisk;
+ private final boolean encodeInCache;
+
+ BlockEncoderTestType(boolean encodeOnDisk, boolean encodeInCache) {
+ this.encodeOnDisk = encodeOnDisk;
+ this.encodeInCache = encodeInCache;
+ }
+
+ public HFileDataBlockEncoder getEncoder() {
+ return new HFileDataBlockEncoderImpl(
+ encodeOnDisk ? ENCODING_ALGO : DataBlockEncoding.NONE,
+ encodeInCache ? ENCODING_ALGO : DataBlockEncoding.NONE);
+ }
+ }
+
public TestCacheOnWrite(CacheOnWriteType cowType,
- Compression.Algorithm compress) {
+ Compression.Algorithm compress, BlockEncoderTestType encoderType) {
this.cowType = cowType;
this.compress = compress;
- testName = "[cacheOnWrite=" + cowType + ", compress=" + compress + "]";
- System.out.println(testName);
+ this.encoderType = encoderType;
+ this.encoder = encoderType.getEncoder();
+ testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
+ ", encoderType=" + encoderType + "]";
+ System.out.println(testDescription);
}
@Parameters
public static Collection<Object[]> getParameters() {
List<Object[]> cowTypes = new ArrayList<Object[]>();
- for (CacheOnWriteType cowType : CacheOnWriteType.values())
+ for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
for (Compression.Algorithm compress :
HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
- cowTypes.add(new Object[] { cowType, compress });
+ for (BlockEncoderTestType encoderType :
+ BlockEncoderTestType.values()) {
+ cowTypes.add(new Object[] { cowType, compress, encoderType });
+ }
}
+ }
return cowTypes;
}
@@ -153,7 +194,6 @@ public class TestCacheOnWrite {
fs = FileSystem.get(conf);
cacheConf = new CacheConfig(conf);
blockCache = cacheConf.getBlockCache();
- System.out.println("setUp()");
}
@After
@@ -169,29 +209,43 @@ public class TestCacheOnWrite {
}
private void readStoreFile() throws IOException {
- HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs,
- storeFilePath, cacheConf);
+ HFileReaderV2 reader = (HFileReaderV2) HFile.createReaderWithEncoding(fs,
+ storeFilePath, cacheConf, encoder.getEncodingInCache());
LOG.info("HFile information: " + reader);
- HFileScanner scanner = reader.getScanner(false, false);
- assertTrue(testName, scanner.seekTo());
+ final boolean cacheBlocks = false;
+ final boolean pread = false;
+ HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
+ assertTrue(testDescription, scanner.seekTo());
long offset = 0;
HFileBlock prevBlock = null;
EnumMap<BlockType, Integer> blockCountByType =
new EnumMap<BlockType, Integer>(BlockType.class);
+ DataBlockEncoding encodingInCache =
+ encoderType.getEncoder().getEncodingInCache();
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
long onDiskSize = -1;
if (prevBlock != null) {
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
}
// Flags: don't cache the block, use pread, this is not a compaction.
+ // Also, pass null for expected block type to avoid checking it.
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
- false);
- BlockCacheKey blockCacheKey = HFile.getBlockCacheKey(reader.getName(), offset);
+ false, null);
+ BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
+ offset, encodingInCache, block.getBlockType());
boolean isCached = blockCache.getBlock(blockCacheKey, true) != null;
boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
- assertEquals(testName + " " + block, shouldBeCached, isCached);
+ if (shouldBeCached != isCached) {
+ throw new AssertionError(
+ "shouldBeCached: " + shouldBeCached+ "\n" +
+ "isCached: " + isCached + "\n" +
+ "Test description: " + testDescription + "\n" +
+ "block: " + block + "\n" +
+ "encodingInCache: " + encodingInCache + "\n" +
+ "blockCacheKey: " + blockCacheKey);
+ }
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
BlockType bt = block.getBlockType();
@@ -201,8 +255,10 @@ public class TestCacheOnWrite {
LOG.info("Block count by type: " + blockCountByType);
String countByType = blockCountByType.toString();
- assertEquals(
- "{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
+ BlockType cachedDataBlockType =
+ encoderType.encodeInCache ? BlockType.ENCODED_DATA : BlockType.DATA;
+ assertEquals("{" + cachedDataBlockType
+ + "=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
countByType);
reader.close();
@@ -228,7 +284,7 @@ public class TestCacheOnWrite {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
"test_cache_on_write");
StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir,
- DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf,
+ DATA_BLOCK_SIZE, compress, encoder, KeyValue.COMPARATOR, conf,
cacheConf, BLOOM_TYPE, NUM_KV);
final int rowLen = 32;
@@ -260,8 +316,9 @@ public class TestCacheOnWrite {
final byte[] cfBytes = Bytes.toBytes(cf);
final int maxVersions = 3;
HRegion region = TEST_UTIL.createTestRegion(table, cf, compress,
- BLOOM_TYPE, maxVersions, HColumnDescriptor.DEFAULT_BLOCKCACHE,
- HFile.DEFAULT_BLOCKSIZE);
+ BLOOM_TYPE, maxVersions, HFile.DEFAULT_BLOCKSIZE,
+ encoder.getEncodingInCache(),
+ encoder.getEncodingOnDisk() != DataBlockEncoding.NONE);
int rowIdx = 0;
long ts = EnvironmentEdgeManager.currentTimeMillis();
for (int iFile = 0; iFile < 5; ++iFile) {
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java Thu Jan 26 02:58:57 2012
@@ -158,7 +158,7 @@ public class TestHFile extends HBaseTest
writeRecords(writer);
fout.close();
FSDataInputStream fin = fs.open(ncTFile);
- Reader reader = HFile.createReader(ncTFile, fs.open(ncTFile),
+ Reader reader = HFile.createReaderFromStream(ncTFile, fs.open(ncTFile),
fs.getFileStatus(ncTFile).getLen(), cacheConf);
System.out.println(cacheConf.toString());
// Load up the index.
@@ -236,7 +236,7 @@ public class TestHFile extends HBaseTest
writer.close();
fout.close();
FSDataInputStream fin = fs.open(mFile);
- Reader reader = HFile.createReader(mFile, fs.open(mFile),
+ Reader reader = HFile.createReaderFromStream(mFile, fs.open(mFile),
this.fs.getFileStatus(mFile).getLen(), cacheConf);
reader.loadFileInfo();
// No data -- this should return false.
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java Thu Jan 26 02:58:57 2012
@@ -27,6 +27,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,16 +47,24 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.DoubleOutputStream;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.Compressor;
import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
@Category(MediumTests.class)
+@RunWith(Parameterized.class)
public class TestHFileBlock {
// change this value to activate more logs
private static final boolean detailedLogging = false;
@@ -69,14 +79,29 @@ public class TestHFileBlock {
static final Compression.Algorithm[] GZIP_ONLY = { GZ };
private static final int NUM_TEST_BLOCKS = 1000;
-
private static final int NUM_READER_THREADS = 26;
+ // Used to generate KeyValues
+ private static int NUM_KEYVALUES = 50;
+ private static int FIELD_LENGTH = 10;
+ private static float CHANCE_TO_REPEAT = 0.6f;
+
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private FileSystem fs;
private int uncompressedSizeV1;
+ private final boolean includesMemstoreTS;
+
+ public TestHFileBlock(boolean includesMemstoreTS) {
+ this.includesMemstoreTS = includesMemstoreTS;
+ }
+
+ @Parameters
+ public static Collection<Object[]> parameters() {
+ return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
+ }
+
@Before
public void setUp() throws IOException {
fs = FileSystem.get(TEST_UTIL.getConfiguration());
@@ -88,6 +113,72 @@ public class TestHFileBlock {
dos.writeInt(i / 100);
}
+ private int writeTestKeyValues(OutputStream dos, int seed)
+ throws IOException {
+ List<KeyValue> keyValues = new ArrayList<KeyValue>();
+ Random randomizer = new Random(42l + seed); // just any fixed number
+
+ // generate keyValues
+ for (int i = 0; i < NUM_KEYVALUES; ++i) {
+ byte[] row;
+ long timestamp;
+ byte[] family;
+ byte[] qualifier;
+ byte[] value;
+
+ // generate it or repeat, it should compress well
+ if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+ row = keyValues.get(randomizer.nextInt(keyValues.size())).getRow();
+ } else {
+ row = new byte[FIELD_LENGTH];
+ randomizer.nextBytes(row);
+ }
+ if (0 == i) {
+ family = new byte[FIELD_LENGTH];
+ randomizer.nextBytes(family);
+ } else {
+ family = keyValues.get(0).getFamily();
+ }
+ if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+ qualifier = keyValues.get(
+ randomizer.nextInt(keyValues.size())).getQualifier();
+ } else {
+ qualifier = new byte[FIELD_LENGTH];
+ randomizer.nextBytes(qualifier);
+ }
+ if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+ value = keyValues.get(randomizer.nextInt(keyValues.size())).getValue();
+ } else {
+ value = new byte[FIELD_LENGTH];
+ randomizer.nextBytes(value);
+ }
+ if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+ timestamp = keyValues.get(
+ randomizer.nextInt(keyValues.size())).getTimestamp();
+ } else {
+ timestamp = randomizer.nextLong();
+ }
+
+ keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
+ }
+
+ // sort it and write to stream
+ int totalSize = 0;
+ Collections.sort(keyValues, KeyValue.COMPARATOR);
+ DataOutputStream dataOutputStream = new DataOutputStream(dos);
+ for (KeyValue kv : keyValues) {
+ totalSize += kv.getLength();
+ dataOutputStream.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
+ if (includesMemstoreTS) {
+ long memstoreTS = randomizer.nextLong();
+ WritableUtils.writeVLong(dataOutputStream, memstoreTS);
+ totalSize += WritableUtils.getVIntSize(memstoreTS);
+ }
+ }
+
+ return totalSize;
+ }
+
public byte[] createTestV1Block(Compression.Algorithm algo)
throws IOException {
Compressor compressor = algo.getCompressor();
@@ -105,8 +196,9 @@ public class TestHFileBlock {
private byte[] createTestV2Block(Compression.Algorithm algo)
throws IOException {
final BlockType blockType = BlockType.DATA;
- HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
- DataOutputStream dos = hbw.startWriting(blockType, false);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
+ includesMemstoreTS);
+ DataOutputStream dos = hbw.startWriting(blockType);
writeTestBlockContents(dos);
byte[] headerAndData = hbw.getHeaderAndData();
assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
@@ -194,10 +286,11 @@ public class TestHFileBlock {
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo);
FSDataOutputStream os = fs.create(path);
- HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
+ includesMemstoreTS);
long totalSize = 0;
for (int blockId = 0; blockId < 2; ++blockId) {
- DataOutputStream dos = hbw.startWriting(BlockType.DATA, false);
+ DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1234; ++i)
dos.writeInt(i);
hbw.writeHeaderAndData(os);
@@ -240,6 +333,136 @@ public class TestHFileBlock {
}
}
+ /**
+ * Test encoding/decoding data blocks.
+ * @throws IOException a bug or a problem with temporary files.
+ */
+ @Test
+ public void testDataBlockEncoding() throws IOException {
+ final int numBlocks = 5;
+ for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+ for (boolean pread : new boolean[] { false, true }) {
+ for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+ Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ + algo + "_" + encoding.toString());
+ FSDataOutputStream os = fs.create(path);
+ HFileDataBlockEncoder dataBlockEncoder =
+ new HFileDataBlockEncoderImpl(encoding);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(algo, dataBlockEncoder,
+ includesMemstoreTS);
+ long totalSize = 0;
+ final List<Integer> encodedSizes = new ArrayList<Integer>();
+ final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
+ for (int blockId = 0; blockId < numBlocks; ++blockId) {
+ writeEncodedBlock(encoding, hbw, encodedSizes, encodedBlocks,
+ blockId);
+
+ hbw.writeHeaderAndData(os);
+ totalSize += hbw.getOnDiskSizeWithHeader();
+ }
+ os.close();
+
+ FSDataInputStream is = fs.open(path);
+ HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, algo,
+ totalSize);
+ hbr.setDataBlockEncoder(dataBlockEncoder);
+ hbr.setIncludesMemstoreTS(includesMemstoreTS);
+
+ HFileBlock b;
+ int pos = 0;
+ for (int blockId = 0; blockId < numBlocks; ++blockId) {
+ b = hbr.readBlockData(pos, -1, -1, pread);
+ b.sanityCheck();
+ pos += b.getOnDiskSizeWithHeader();
+
+ assertEquals((int) encodedSizes.get(blockId),
+ b.getUncompressedSizeWithoutHeader());
+ ByteBuffer actualBuffer = b.getBufferWithoutHeader();
+ if (encoding != DataBlockEncoding.NONE) {
+ // We expect a two-byte big-endian encoding id.
+ assertEquals(0, actualBuffer.get(0));
+ assertEquals(encoding.getId(), actualBuffer.get(1));
+ actualBuffer.position(2);
+ actualBuffer = actualBuffer.slice();
+ }
+
+ ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
+ expectedBuffer.rewind();
+
+ // test if content matches, produce nice message
+ assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding,
+ pread);
+ }
+ is.close();
+ }
+ }
+ }
+ }
+
+ private void writeEncodedBlock(DataBlockEncoding encoding,
+ HFileBlock.Writer hbw, final List<Integer> encodedSizes,
+ final List<ByteBuffer> encodedBlocks, int blockId) throws IOException {
+ DataOutputStream dos = hbw.startWriting(BlockType.DATA);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DoubleOutputStream doubleOutputStream =
+ new DoubleOutputStream(dos, baos);
+
+ final int rawBlockSize = writeTestKeyValues(doubleOutputStream,
+ blockId);
+
+ ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray());
+ rawBuf.rewind();
+
+ final int encodedSize;
+ final ByteBuffer encodedBuf;
+ if (encoding == DataBlockEncoding.NONE) {
+ encodedSize = rawBlockSize;
+ encodedBuf = rawBuf;
+ } else {
+ ByteArrayOutputStream encodedOut = new ByteArrayOutputStream();
+ encoding.getEncoder().compressKeyValues(
+ new DataOutputStream(encodedOut),
+ rawBuf.duplicate(), includesMemstoreTS);
+ // We need to account for the two-byte encoding algorithm ID that
+ // comes after the 24-byte block header but before encoded KVs.
+ encodedSize = encodedOut.size() + DataBlockEncoding.ID_SIZE;
+ encodedBuf = ByteBuffer.wrap(encodedOut.toByteArray());
+ }
+ encodedSizes.add(encodedSize);
+ encodedBlocks.add(encodedBuf);
+ }
+
+ private void assertBuffersEqual(ByteBuffer expectedBuffer,
+ ByteBuffer actualBuffer, Compression.Algorithm compression,
+ DataBlockEncoding encoding, boolean pread) {
+ if (!actualBuffer.equals(expectedBuffer)) {
+ int prefix = 0;
+ int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
+ while (prefix < minLimit &&
+ expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
+ prefix++;
+ }
+
+ fail(String.format(
+ "Content mismath for compression %s, encoding %s, " +
+ "pread %s, commonPrefix %d, expected %s, got %s",
+ compression, encoding, pread, prefix,
+ nextBytesToStr(expectedBuffer, prefix),
+ nextBytesToStr(actualBuffer, prefix)));
+ }
+ }
+
+ /**
+ * Convert a few next bytes in the given buffer at the given position to
+ * string. Used for error messages.
+ */
+ private static String nextBytesToStr(ByteBuffer buf, int pos) {
+ int maxBytes = buf.limit() - pos;
+ int numBytes = Math.min(16, maxBytes);
+ return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
+ numBytes) + (numBytes < maxBytes ? "..." : "");
+ }
+
@Test
public void testPreviousOffset() throws IOException {
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
@@ -446,13 +669,17 @@ public class TestHFileBlock {
) throws IOException {
boolean cacheOnWrite = expectedContents != null;
FSDataOutputStream os = fs.create(path);
- HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo, null,
+ includesMemstoreTS);
Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
long totalSize = 0;
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
+ if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
+ blockTypeOrdinal = BlockType.DATA.ordinal();
+ }
BlockType bt = BlockType.values()[blockTypeOrdinal];
- DataOutputStream dos = hbw.startWriting(bt, cacheOnWrite);
+ DataOutputStream dos = hbw.startWriting(bt);
for (int j = 0; j < rand.nextInt(500); ++j) {
// This might compress well.
dos.writeShort(i + 1);
@@ -501,7 +728,7 @@ public class TestHFileBlock {
byte[] byteArr = new byte[HFileBlock.HEADER_SIZE + size];
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
- true, -1);
+ HFileBlock.FILL_HEADER, -1, includesMemstoreTS);
long byteBufferExpectedSize =
ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
+ HFileBlock.HEADER_SIZE + size);
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java Thu Jan 26 02:58:57 2012
@@ -20,6 +20,10 @@
package org.apache.hadoop.hbase.io.hfile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -44,7 +48,6 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
-
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -52,8 +55,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import static org.junit.Assert.*;
-
@RunWith(Parameterized.class)
@Category(MediumTests.class)
public class TestHFileBlockIndex {
@@ -92,6 +93,8 @@ public class TestHFileBlockIndex {
private static final int[] UNCOMPRESSED_INDEX_SIZES =
{ 19187, 21813, 23086 };
+ private static final boolean includesMemstoreTS = true;
+
static {
assert INDEX_CHUNK_SIZES.length == EXPECTED_NUM_LEVELS.length;
assert INDEX_CHUNK_SIZES.length == UNCOMPRESSED_INDEX_SIZES.length;
@@ -138,7 +141,8 @@ public class TestHFileBlockIndex {
@Override
public HFileBlock readBlock(long offset, long onDiskSize,
- boolean cacheBlock, boolean pread, boolean isCompaction)
+ boolean cacheBlock, boolean pread, boolean isCompaction,
+ BlockType expectedBlockType)
throws IOException {
if (offset == prevOffset && onDiskSize == prevOnDiskSize &&
pread == prevPread) {
@@ -210,13 +214,14 @@ public class TestHFileBlockIndex {
private void writeWholeIndex() throws IOException {
assertEquals(0, keys.size());
- HFileBlock.Writer hbw = new HFileBlock.Writer(compr);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(compr, null,
+ includesMemstoreTS);
FSDataOutputStream outputStream = fs.create(path);
HFileBlockIndex.BlockIndexWriter biw =
new HFileBlockIndex.BlockIndexWriter(hbw, null, null);
for (int i = 0; i < NUM_DATA_BLOCKS; ++i) {
- hbw.startWriting(BlockType.DATA, false).write(
+ hbw.startWriting(BlockType.DATA).write(
String.valueOf(rand.nextInt(1000)).getBytes());
long blockOffset = outputStream.getPos();
hbw.writeHeaderAndData(outputStream);
@@ -251,7 +256,7 @@ public class TestHFileBlockIndex {
boolean isClosing) throws IOException {
while (biw.shouldWriteBlock(isClosing)) {
long offset = outputStream.getPos();
- biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType(), false));
+ biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType()));
hbw.writeHeaderAndData(outputStream);
biw.blockWritten(offset, hbw.getOnDiskSizeWithHeader(),
hbw.getUncompressedSizeWithoutHeader());
@@ -479,7 +484,7 @@ public class TestHFileBlockIndex {
{
HFile.Writer writer =
HFile.getWriterFactory(conf, cacheConf).createWriter(fs,
- hfilePath, SMALL_BLOCK_SIZE, compr, KeyValue.KEY_COMPARATOR);
+ hfilePath, SMALL_BLOCK_SIZE, compr, null, KeyValue.KEY_COMPARATOR);
Random rand = new Random(19231737);
for (int i = 0; i < NUM_KV; ++i) {
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.RedundantKVGenerator;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestHFileDataBlockEncoder {
+ private Configuration conf;
+ private final HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+ private HFileDataBlockEncoderImpl blockEncoder;
+ private RedundantKVGenerator generator = new RedundantKVGenerator();
+ private SchemaConfigured UNKNOWN_TABLE_AND_CF =
+ SchemaConfigured.createUnknown();
+ private boolean includesMemstoreTS;
+
+ /**
+ * Create test for given data block encoding configuration.
+ * @param blockEncoder What kind of encoding policy will be used.
+ */
+ public TestHFileDataBlockEncoder(HFileDataBlockEncoderImpl blockEncoder,
+ boolean includesMemstoreTS) {
+ this.blockEncoder = blockEncoder;
+ this.includesMemstoreTS = includesMemstoreTS;
+ System.err.println("On-disk encoding: " + blockEncoder.getEncodingOnDisk()
+ + ", in-cache encoding: " + blockEncoder.getEncodingInCache()
+ + ", includesMemstoreTS: " + includesMemstoreTS);
+ }
+
+ /**
+ * Preparation before JUnit test.
+ */
+ @Before
+ public void setUp() {
+ conf = TEST_UTIL.getConfiguration();
+ SchemaMetrics.configureGlobally(conf);
+ }
+
+ /**
+ * Cleanup after JUnit test.
+ */
+ @After
+ public void tearDown() throws IOException {
+ TEST_UTIL.cleanupTestDir();
+ }
+
+ /**
+ * Test putting and taking out blocks into cache with different
+ * encoding options.
+ */
+ @Test
+ public void testEncodingWithCache() {
+ HFileBlock block = getSampleHFileBlock();
+ LruBlockCache blockCache =
+ new LruBlockCache(8 * 1024 * 1024, 32 * 1024);
+ HFileBlock cacheBlock = blockEncoder.diskToCacheFormat(block, false);
+ BlockCacheKey cacheKey = new BlockCacheKey("test", 0);
+ blockCache.cacheBlock(cacheKey, cacheBlock);
+
+ HeapSize heapSize = blockCache.getBlock(cacheKey, false);
+ assertTrue(heapSize instanceof HFileBlock);
+
+ HFileBlock returnedBlock = (HFileBlock) heapSize;;
+
+ if (blockEncoder.getEncodingInCache() ==
+ DataBlockEncoding.NONE) {
+ assertEquals(block.getBufferWithHeader(),
+ returnedBlock.getBufferWithHeader());
+ } else {
+ if (BlockType.ENCODED_DATA != returnedBlock.getBlockType()) {
+ System.out.println(blockEncoder);
+ }
+ assertEquals(BlockType.ENCODED_DATA, returnedBlock.getBlockType());
+ }
+ }
+
+ /**
+ * Test writing to disk.
+ */
+ @Test
+ public void testEncodingWritePath() {
+ // usually we have just block without headers, but don't complicate that
+ HFileBlock block = getSampleHFileBlock();
+ Pair<ByteBuffer, BlockType> result =
+ blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(),
+ includesMemstoreTS);
+
+ int size = result.getFirst().limit() - HFileBlock.HEADER_SIZE;
+ HFileBlock blockOnDisk = new HFileBlock(result.getSecond(),
+ size, size, -1, result.getFirst(), HFileBlock.FILL_HEADER, 0,
+ includesMemstoreTS);
+
+ if (blockEncoder.getEncodingOnDisk() !=
+ DataBlockEncoding.NONE) {
+ assertEquals(BlockType.ENCODED_DATA, blockOnDisk.getBlockType());
+ assertEquals(blockEncoder.getEncodingOnDisk().getId(),
+ blockOnDisk.getDataBlockEncodingId());
+ } else {
+ assertEquals(BlockType.DATA, blockOnDisk.getBlockType());
+ }
+ }
+
+ /**
+ * Test converting blocks from disk to cache format.
+ */
+ @Test
+ public void testEncodingReadPath() {
+ HFileBlock origBlock = getSampleHFileBlock();
+ blockEncoder.diskToCacheFormat(origBlock, false);
+ }
+
+ private HFileBlock getSampleHFileBlock() {
+ ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer(
+ generator.generateTestKeyValues(60), includesMemstoreTS);
+ int size = keyValues.limit();
+ ByteBuffer buf = ByteBuffer.allocate(size + HFileBlock.HEADER_SIZE);
+ buf.position(HFileBlock.HEADER_SIZE);
+ keyValues.rewind();
+ buf.put(keyValues);
+ HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
+ HFileBlock.FILL_HEADER, 0, includesMemstoreTS);
+ UNKNOWN_TABLE_AND_CF.passSchemaMetricsTo(b);
+ return b;
+ }
+
+ /**
+ * @return All possible data block encoding configurations
+ */
+ @Parameters
+ public static Collection<Object[]> getAllConfigurations() {
+ List<Object[]> configurations =
+ new ArrayList<Object[]>();
+
+ for (DataBlockEncoding diskAlgo : DataBlockEncoding.values()) {
+ for (DataBlockEncoding cacheAlgo : DataBlockEncoding.values()) {
+ if (diskAlgo != cacheAlgo && diskAlgo != DataBlockEncoding.NONE) {
+ // We allow (1) the same encoding on disk and in cache, and
+ // (2) some encoding in cache but no encoding on disk (for testing).
+ continue;
+ }
+ for (boolean includesMemstoreTS : new boolean[] {false, true}) {
+ configurations.add(new Object[] {
+ new HFileDataBlockEncoderImpl(diskAlgo, cacheAlgo),
+ new Boolean(includesMemstoreTS)});
+ }
+ }
+ }
+
+ return configurations;
+ }
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java Thu Jan 26 02:58:57 2012
@@ -166,7 +166,7 @@ public class TestHFilePerformance extend
minBlockSize, codecName, null);
// Writing value in one shot.
- for (long l=0 ; l<rows ; l++ ) {
+ for (long l=0; l<rows; l++ ) {
generator.getKey(key);
generator.getValue(value);
writer.append(key, value);
@@ -195,7 +195,7 @@ public class TestHFilePerformance extend
BytesWritable keyBsw;
BytesWritable valBsw;
- for (long l=0 ; l<rows ; l++ ) {
+ for (long l=0; l<rows; l++ ) {
generator.getKey(key);
keyBsw = new BytesWritable(key);
@@ -241,7 +241,7 @@ public class TestHFilePerformance extend
FSDataInputStream fin = fs.open(path);
if ("HFile".equals(fileType)){
- HFile.Reader reader = HFile.createReader(path, fs.open(path),
+ HFile.Reader reader = HFile.createReaderFromStream(path, fs.open(path),
fs.getFileStatus(path).getLen(), new CacheConfig(conf));
reader.loadFileInfo();
switch (method) {
@@ -252,7 +252,7 @@ public class TestHFilePerformance extend
{
HFileScanner scanner = reader.getScanner(false, false);
scanner.seekTo();
- for (long l=0 ; l<rows ; l++ ) {
+ for (long l=0; l<rows; l++ ) {
key = scanner.getKey();
val = scanner.getValue();
totalBytesRead += key.limit() + val.limit();
@@ -275,7 +275,7 @@ public class TestHFilePerformance extend
BytesWritable keyBsw = new BytesWritable();
BytesWritable valBsw = new BytesWritable();
- for (long l=0 ; l<rows ; l++ ) {
+ for (long l=0; l<rows; l++ ) {
reader.next(keyBsw, valBsw);
totalBytesRead += keyBsw.getSize() + valBsw.getSize();
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java Thu Jan 26 02:58:57 2012
@@ -167,7 +167,7 @@ public class TestHFileSeek extends TestC
int miss = 0;
long totalBytes = 0;
FSDataInputStream fsdis = fs.open(path);
- Reader reader = HFile.createReader(path, fsdis,
+ Reader reader = HFile.createReaderFromStream(path, fsdis,
fs.getFileStatus(path).getLen(), new CacheConfig(conf));
reader.loadFileInfo();
KeySampler kSampler =
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java Thu Jan 26 02:58:57 2012
@@ -76,7 +76,7 @@ public class TestHFileWriterV2 {
final Compression.Algorithm COMPRESS_ALGO = Compression.Algorithm.GZ;
HFileWriterV2 writer = new HFileWriterV2(conf, new CacheConfig(conf), fs,
- hfilePath, 4096, COMPRESS_ALGO, KeyValue.KEY_COMPARATOR);
+ hfilePath, 4096, COMPRESS_ALGO, null, KeyValue.KEY_COMPARATOR);
long totalKeyLength = 0;
long totalValueLength = 0;
@@ -125,10 +125,12 @@ public class TestHFileWriterV2 {
new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize);
// Comparator class name is stored in the trailer in version 2.
RawComparator<byte []> comparator = trailer.createComparator();
- HFileBlockIndex.BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
- trailer.getNumDataIndexLevels());
- HFileBlockIndex.BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
- Bytes.BYTES_RAWCOMPARATOR, 1);
+ HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
+ new HFileBlockIndex.BlockIndexReader(comparator,
+ trailer.getNumDataIndexLevels());
+ HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
+ new HFileBlockIndex.BlockIndexReader(
+ Bytes.BYTES_RAWCOMPARATOR, 1);
HFileBlock.BlockIterator blockIter = blockReader.blockRange(
trailer.getLoadOnOpenDataOffset(),
@@ -146,8 +148,10 @@ public class TestHFileWriterV2 {
// File info
FileInfo fileInfo = new FileInfo();
fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO));
- byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
- boolean includeMemstoreTS = (keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0);
+ byte [] keyValueFormatVersion = fileInfo.get(
+ HFileWriterV2.KEY_VALUE_VERSION);
+ boolean includeMemstoreTS = keyValueFormatVersion != null &&
+ Bytes.toInt(keyValueFormatVersion) > 0;
// Counters for the number of key/value pairs and the number of blocks
int entriesRead = 0;
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Thu Jan 26 02:58:57 2012
@@ -133,6 +133,8 @@ public class TestImportExport {
5, /* versions */
true /* keep deleted cells */,
HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_ENCODE_ON_DISK,
+ HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING,
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
HColumnDescriptor.DEFAULT_BLOCKSIZE,
@@ -179,6 +181,8 @@ public class TestImportExport {
5, /* versions */
true /* keep deleted cells */,
HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_ENCODE_ON_DISK,
+ HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING,
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
HColumnDescriptor.DEFAULT_BLOCKSIZE,
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java Thu Jan 26 02:58:57 2012
@@ -184,8 +184,8 @@ public class CreateRandomStoreFile {
}
StoreFile.Writer sfw = StoreFile.createWriter(fs, outputDir, blockSize,
- compr, KeyValue.COMPARATOR, conf, new CacheConfig(conf), bloomType,
- numKV);
+ compr, null, KeyValue.COMPARATOR, conf, new CacheConfig(conf),
+ bloomType, numKV);
rand = new Random();
LOG.info("Writing " + numKV + " key/value pairs");
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Tests various algorithms for key compression on an existing HFile. Useful
+ * for testing, debugging and benchmarking.
+ */
+public class DataBlockEncodingTool {
+ private static final Log LOG = LogFactory.getLog(
+ DataBlockEncodingTool.class);
+
+ private static final boolean includesMemstoreTS = true;
+
+ /**
+ * How many times should benchmark run.
+ * More times means better data in terms of statistics.
+ * It has to be larger than BENCHMARK_N_OMIT.
+ */
+ public static int BENCHMARK_N_TIMES = 12;
+
+ /**
+ * How many first runs should omit benchmark.
+ * Usually it is one in order to exclude setup cost.
+ * Has to be 0 or larger.
+ */
+ public static int BENCHMARK_N_OMIT = 2;
+
+ private List<EncodedDataBlock> codecs = new ArrayList<EncodedDataBlock>();
+ private int totalPrefixLength = 0;
+ private int totalKeyLength = 0;
+ private int totalValueLength = 0;
+ private int totalKeyRedundancyLength = 0;
+
+ final private String compressionAlgorithmName;
+ final private Algorithm compressionAlgorithm;
+ final private Compressor compressor;
+ final private Decompressor decompressor;
+
+ /**
+ * @param compressionAlgorithmName What kind of algorithm should be used
+ * as baseline for comparison (e.g. lzo, gz).
+ */
+ public DataBlockEncodingTool(String compressionAlgorithmName) {
+ this.compressionAlgorithmName = compressionAlgorithmName;
+ this.compressionAlgorithm = Compression.getCompressionAlgorithmByName(
+ compressionAlgorithmName);
+ this.compressor = this.compressionAlgorithm.getCompressor();
+ this.decompressor = this.compressionAlgorithm.getDecompressor();
+ }
+ /**
+ * Check statistics for given HFile for different data block encoders.
+ * @param scanner Of file which will be compressed.
+ * @param kvLimit Maximal count of KeyValue which will be processed.
+ * @throws IOException thrown if scanner is invalid
+ */
+ public void checkStatistics(final KeyValueScanner scanner, final int kvLimit)
+ throws IOException {
+ scanner.seek(KeyValue.LOWESTKEY);
+
+ KeyValue currentKv;
+
+ byte[] previousKey = null;
+ byte[] currentKey;
+
+ List<DataBlockEncoder> dataBlockEncoders =
+ DataBlockEncoding.getAllEncoders();
+
+ for (DataBlockEncoder d : dataBlockEncoders) {
+ codecs.add(new EncodedDataBlock(d, includesMemstoreTS));
+ }
+
+ int j = 0;
+ while ((currentKv = scanner.next()) != null && j < kvLimit) {
+ // Iterates through key/value pairs
+ j++;
+ currentKey = currentKv.getKey();
+ if (previousKey != null) {
+ for (int i = 0; i < previousKey.length && i < currentKey.length &&
+ previousKey[i] == currentKey[i]; ++i) {
+ totalKeyRedundancyLength++;
+ }
+ }
+
+ for (EncodedDataBlock codec : codecs) {
+ codec.addKv(currentKv);
+ }
+
+ previousKey = currentKey;
+
+ totalPrefixLength += currentKv.getLength() - currentKv.getKeyLength() -
+ currentKv.getValueLength();
+ totalKeyLength += currentKv.getKeyLength();
+ totalValueLength += currentKv.getValueLength();
+ }
+ }
+
+ /**
+ * Verify if all data block encoders are working properly.
+ *
+ * @param scanner Of file which was compressed.
+ * @param kvLimit Maximal count of KeyValue which will be processed.
+ * @return true if all data block encoders compressed/decompressed correctly.
+ * @throws IOException thrown if scanner is invalid
+ */
+ public boolean verifyCodecs(final KeyValueScanner scanner, final int kvLimit)
+ throws IOException {
+ KeyValue currentKv;
+
+ scanner.seek(KeyValue.LOWESTKEY);
+ List<Iterator<KeyValue>> codecIterators =
+ new ArrayList<Iterator<KeyValue>>();
+ for(EncodedDataBlock codec : codecs) {
+ codecIterators.add(codec.getIterator());
+ }
+
+ int j = 0;
+ while ((currentKv = scanner.next()) != null && j < kvLimit) {
+ // Iterates through key/value pairs
+ ++j;
+ for (Iterator<KeyValue> it : codecIterators) {
+ KeyValue codecKv = it.next();
+ if (codecKv == null || 0 != Bytes.compareTo(
+ codecKv.getBuffer(), codecKv.getOffset(), codecKv.getLength(),
+ currentKv.getBuffer(), currentKv.getOffset(),
+ currentKv.getLength())) {
+ if (codecKv == null) {
+ LOG.error("There is a bug in codec " + it +
+ " it returned null KeyValue,");
+ } else {
+ int prefix = 0;
+ int limitLength = 2 * Bytes.SIZEOF_INT +
+ Math.min(codecKv.getLength(), currentKv.getLength());
+ while (prefix < limitLength &&
+ codecKv.getBuffer()[prefix + codecKv.getOffset()] ==
+ currentKv.getBuffer()[prefix + currentKv.getOffset()]) {
+ prefix++;
+ }
+
+ LOG.error("There is bug in codec " + it.toString() +
+ "\n on element " + j +
+ "\n codecKv.getKeyLength() " + codecKv.getKeyLength() +
+ "\n codecKv.getValueLength() " + codecKv.getValueLength() +
+ "\n codecKv.getLength() " + codecKv.getLength() +
+ "\n currentKv.getKeyLength() " + currentKv.getKeyLength() +
+ "\n currentKv.getValueLength() " + currentKv.getValueLength() +
+ "\n codecKv.getLength() " + currentKv.getLength() +
+ "\n currentKV rowLength " + currentKv.getRowLength() +
+ " familyName " + currentKv.getFamilyLength() +
+ " qualifier " + currentKv.getQualifierLength() +
+ "\n prefix " + prefix +
+ "\n codecKv '" + Bytes.toStringBinary(codecKv.getBuffer(),
+ codecKv.getOffset(), prefix) + "' diff '" +
+ Bytes.toStringBinary(codecKv.getBuffer(),
+ codecKv.getOffset() + prefix, codecKv.getLength() -
+ prefix) + "'" +
+ "\n currentKv '" + Bytes.toStringBinary(
+ currentKv.getBuffer(),
+ currentKv.getOffset(), prefix) + "' diff '" +
+ Bytes.toStringBinary(currentKv.getBuffer(),
+ currentKv.getOffset() + prefix, currentKv.getLength() -
+ prefix) + "'"
+ );
+ }
+ return false;
+ }
+ }
+ }
+
+ LOG.info("Verification was successful!");
+
+ return true;
+ }
+
+ /**
+ * Benchmark codec's speed.
+ */
+ public void benchmarkCodecs() {
+ int prevTotalSize = -1;
+ for (EncodedDataBlock codec : codecs) {
+ prevTotalSize = benchmarkEncoder(prevTotalSize, codec);
+ }
+
+ byte[] buffer = codecs.get(0).getRawKeyValues();
+
+ benchmarkDefaultCompression(prevTotalSize, buffer);
+ }
+
+ /**
+ * Benchmark compression/decompression throughput.
+ * @param previousTotalSize Total size used for verification. Use -1 if
+ * unknown.
+ * @param codec Tested encoder.
+ * @return Size of uncompressed data.
+ */
+ private int benchmarkEncoder(int previousTotalSize, EncodedDataBlock codec) {
+ int prevTotalSize = previousTotalSize;
+ int totalSize = 0;
+
+ // decompression time
+ List<Long> durations = new ArrayList<Long>();
+ for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
+ totalSize = 0;
+
+ Iterator<KeyValue> it;
+
+ it = codec.getIterator();
+
+ // count only the algorithm time, without memory allocations
+ // (expect first time)
+ final long startTime = System.nanoTime();
+ while (it.hasNext()) {
+ totalSize += it.next().getLength();
+ }
+ final long finishTime = System.nanoTime();
+ if (itTime >= BENCHMARK_N_OMIT) {
+ durations.add(finishTime - startTime);
+ }
+
+ if (prevTotalSize != -1 && prevTotalSize != totalSize) {
+ throw new IllegalStateException(String.format(
+ "Algorithm '%s' decoded data to different size", codec.toString()));
+ }
+ prevTotalSize = totalSize;
+ }
+
+ // compression time
+ List<Long> compressDurations = new ArrayList<Long>();
+ for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
+ final long startTime = System.nanoTime();
+ codec.doCompressData();
+ final long finishTime = System.nanoTime();
+ if (itTime >= BENCHMARK_N_OMIT) {
+ compressDurations.add(finishTime - startTime);
+ }
+ }
+
+ System.out.println(codec.toString() + ":");
+ printBenchmarkResult(totalSize, compressDurations, false);
+ printBenchmarkResult(totalSize, durations, true);
+
+ return prevTotalSize;
+ }
+
+ private void benchmarkDefaultCompression(int totalSize, byte[] rawBuffer) {
+ benchmarkAlgorithm(compressionAlgorithm, compressor, decompressor,
+ compressionAlgorithmName.toUpperCase(), rawBuffer, 0, totalSize);
+ }
+
+ /**
+ * Check decompress performance of a given algorithm and print it.
+ * @param algorithm Compression algorithm.
+ * @param compressorCodec Compressor to be tested.
+ * @param decompressorCodec Decompressor of the same algorithm.
+ * @param name Name of algorithm.
+ * @param buffer Buffer to be compressed.
+ * @param offset Position of the beginning of the data.
+ * @param length Length of data in buffer.
+ */
+ public static void benchmarkAlgorithm(
+ Compression.Algorithm algorithm,
+ Compressor compressorCodec,
+ Decompressor decompressorCodec,
+ String name,
+ byte[] buffer, int offset, int length) {
+ System.out.println(name + ":");
+
+ // compress it
+ List<Long> compressDurations = new ArrayList<Long>();
+ ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
+ OutputStream compressingStream;
+ try {
+ for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
+ final long startTime = System.nanoTime();
+ compressingStream = algorithm.createCompressionStream(
+ compressedStream, compressorCodec, 0);
+ compressingStream.write(buffer, offset, length);
+ compressingStream.flush();
+ compressedStream.toByteArray();
+
+ final long finishTime = System.nanoTime();
+
+ // add time record
+ if (itTime >= BENCHMARK_N_OMIT) {
+ compressDurations.add(finishTime - startTime);
+ }
+
+ if (itTime + 1 < BENCHMARK_N_TIMES) { // not the last one
+ compressedStream.reset();
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Benchmark, or encoding algorithm '%s' cause some stream problems",
+ name), e);
+ }
+ printBenchmarkResult(length, compressDurations, false);
+
+
+ byte[] compBuffer = compressedStream.toByteArray();
+
+ // uncompress it several times and measure performance
+ List<Long> durations = new ArrayList<Long>();
+ for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
+ final long startTime = System.nanoTime();
+ byte[] newBuf = new byte[length + 1];
+
+ try {
+
+ ByteArrayInputStream downStream = new ByteArrayInputStream(compBuffer,
+ 0, compBuffer.length);
+ InputStream decompressedStream = algorithm.createDecompressionStream(
+ downStream, decompressorCodec, 0);
+
+ int destOffset = 0;
+ int nextChunk;
+ while ((nextChunk = decompressedStream.available()) > 0) {
+ destOffset += decompressedStream.read(newBuf, destOffset, nextChunk);
+ }
+ decompressedStream.close();
+
+ // iterate over KeyValue
+ KeyValue kv;
+ for (int pos = 0; pos < length; pos += kv.getLength()) {
+ kv = new KeyValue(newBuf, pos);
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Decoding path in '%s' algorithm cause exception ", name), e);
+ }
+
+ final long finishTime = System.nanoTime();
+
+ // check correctness
+ if (0 != Bytes.compareTo(buffer, 0, length, newBuf, 0, length)) {
+ int prefix = 0;
+ for(; prefix < buffer.length && prefix < newBuf.length; ++prefix) {
+ if (buffer[prefix] != newBuf[prefix]) {
+ break;
+ }
+ }
+ throw new RuntimeException(String.format(
+ "Algorithm '%s' is corrupting the data", name));
+ }
+
+ // add time record
+ if (itTime >= BENCHMARK_N_OMIT) {
+ durations.add(finishTime - startTime);
+ }
+ }
+ printBenchmarkResult(length, durations, true);
+ }
+
+ private static void printBenchmarkResult(int totalSize,
+ List<Long> durationsInNanoSed, boolean isDecompression) {
+ long meanTime = 0;
+ for (long time : durationsInNanoSed) {
+ meanTime += time;
+ }
+ meanTime /= durationsInNanoSed.size();
+
+ long standardDev = 0;
+ for (long time : durationsInNanoSed) {
+ standardDev += (time - meanTime) * (time - meanTime);
+ }
+ standardDev = (long) Math.sqrt(standardDev / durationsInNanoSed.size());
+
+ final double million = 1000.0 * 1000.0 * 1000.0;
+ double mbPerSec = (totalSize * million) / (1024.0 * 1024.0 * meanTime);
+ double mbPerSecDev = (totalSize * million) /
+ (1024.0 * 1024.0 * (meanTime - standardDev));
+
+ System.out.println(String.format(
+ " %s performance:%s %6.2f MB/s (+/- %.2f MB/s)",
+ isDecompression ? "Decompression" : "Compression",
+ isDecompression ? "" : " ",
+ mbPerSec, mbPerSecDev - mbPerSec));
+ }
+
+ /**
+ * Display statistics of different compression algorithms.
+ */
+ public void displayStatistics() {
+ int totalLength = totalPrefixLength + totalKeyLength + totalValueLength;
+ compressor.reset();
+
+ for(EncodedDataBlock codec : codecs) {
+ System.out.println(codec.toString());
+ int saved = totalKeyLength + totalPrefixLength + totalValueLength
+ - codec.getSize();
+ System.out.println(
+ String.format(" Saved bytes: %8d", saved));
+ double keyRatio = (saved * 100.0) / (totalPrefixLength + totalKeyLength);
+ double allRatio = (saved * 100.0) / totalLength;
+ System.out.println(
+ String.format(" Key compression ratio: %.2f %%", keyRatio));
+ System.out.println(
+ String.format(" All compression ratio: %.2f %%", allRatio));
+ int compressedSize = codec.checkCompressedSize(compressor);
+ System.out.println(
+ String.format(" %s compressed size: %8d",
+ compressionAlgorithmName.toUpperCase(), compressedSize));
+ double lzoRatio = 100.0 * (1.0 - compressedSize / (0.0 + totalLength));
+ System.out.println(
+ String.format(" %s compression ratio: %.2f %%",
+ compressionAlgorithmName.toUpperCase(), lzoRatio));
+ }
+
+ System.out.println(
+ String.format("Total KV prefix length: %8d", totalPrefixLength));
+ System.out.println(
+ String.format("Total key length: %8d", totalKeyLength));
+ System.out.println(
+ String.format("Total key redundancy: %8d",
+ totalKeyRedundancyLength));
+ System.out.println(
+ String.format("Total value length: %8d", totalValueLength));
+ }
+
+ /**
+ * Test a data block encoder on the given HFile. Output results to console.
+ * @param kvLimit The limit of KeyValue which will be analyzed.
+ * @param hfilePath an HFile path on the file system.
+ * @param compressionName Compression algorithm used for comparison.
+ * @param doBenchmark Run performance benchmarks.
+ * @param doVerify Verify correctness.
+ * @throws IOException When pathName is incorrect.
+ */
+ public static void testCodecs(int kvLimit, String hfilePath,
+ String compressionName, boolean doBenchmark, boolean doVerify)
+ throws IOException {
+ // create environment
+ Path path = new Path(hfilePath);
+ Configuration conf = HBaseConfiguration.create();
+ CacheConfig cacheConf = new CacheConfig(conf);
+ FileSystem fs = FileSystem.get(conf);
+ StoreFile hsf = new StoreFile(fs, path, conf, cacheConf,
+ StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
+
+ StoreFile.Reader reader = hsf.createReader();
+ reader.loadFileInfo();
+ KeyValueScanner scanner = reader.getStoreFileScanner(true, true);
+
+ // run the utilities
+ DataBlockEncodingTool comp = new DataBlockEncodingTool(compressionName);
+ comp.checkStatistics(scanner, kvLimit);
+ if (doVerify) {
+ comp.verifyCodecs(scanner, kvLimit);
+ }
+ if (doBenchmark) {
+ comp.benchmarkCodecs();
+ }
+ comp.displayStatistics();
+
+ // cleanup
+ scanner.close();
+ reader.close(cacheConf.shouldEvictOnClose());
+ }
+
+ private static void printUsage(Options options) {
+ System.err.println("Usage:");
+ System.err.println(String.format("./hbase %s <options>",
+ DataBlockEncodingTool.class.getName()));
+ System.err.println("Options:");
+ for (Object it : options.getOptions()) {
+ Option opt = (Option) it;
+ if (opt.hasArg()) {
+ System.err.println(String.format("-%s %s: %s", opt.getOpt(),
+ opt.getArgName(), opt.getDescription()));
+ } else {
+ System.err.println(String.format("-%s: %s", opt.getOpt(),
+ opt.getDescription()));
+ }
+ }
+ }
+
+ /**
+ * A command line interface to benchmarks.
+ * @param args Should have length at least 1 and holds the file path to HFile.
+ * @throws IOException If you specified the wrong file.
+ */
+ public static void main(final String[] args) throws IOException {
+ // set up user arguments
+ Options options = new Options();
+ options.addOption("f", true, "HFile to analyse (REQUIRED)");
+ options.getOption("f").setArgName("FILENAME");
+ options.addOption("n", true,
+ "Limit number of KeyValue which will be analysed");
+ options.getOption("n").setArgName("NUMBER");
+ options.addOption("b", false, "Measure read throughput");
+ options.addOption("c", false, "Omit corectness tests.");
+ options.addOption("a", true,
+ "What kind of compression algorithm use for comparison.");
+
+ // parse arguments
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException e) {
+ System.err.println("Could not parse arguments!");
+ System.exit(-1);
+ return; // avoid warning
+ }
+
+ int kvLimit = Integer.MAX_VALUE;
+ if (cmd.hasOption("n")) {
+ kvLimit = Integer.parseInt(cmd.getOptionValue("n"));
+ }
+
+ // basic argument sanity checks
+ if (!cmd.hasOption("f")) {
+ System.err.println("ERROR: Filename is required!");
+ printUsage(options);
+ System.exit(-1);
+ }
+
+ if (!(new File(cmd.getOptionValue("f"))).exists()) {
+ System.err.println(String.format("ERROR: file '%s' doesn't exist!",
+ cmd.getOptionValue("f")));
+ printUsage(options);
+ System.exit(-1);
+ }
+
+ String pathName = cmd.getOptionValue("f");
+ String compressionName = "gz";
+ if (cmd.hasOption("a")) {
+ compressionName = cmd.getOptionValue("a");
+ }
+ boolean doBenchmark = cmd.hasOption("b");
+ boolean doVerify = !cmd.hasOption("c");
+
+ testCodecs(kvLimit, pathName, compressionName, doBenchmark, doVerify);
+ }
+
+}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+
+/**
+ * Test seek performance for encoded data blocks. Read an HFile and do several
+ * random seeks.
+ */
+public class EncodedSeekPerformanceTest {
+ private static final double NANOSEC_IN_SEC = 1000.0 * 1000.0 * 1000.0;
+ private static final double BYTES_IN_MEGABYTES = 1024.0 * 1024.0;
+ /** Default number of seeks which will be used in benchmark. */
+ public static int DEFAULT_NUMBER_OF_SEEKS = 10000;
+
+ private final HBaseTestingUtility testingUtility = new HBaseTestingUtility();
+ private Configuration configuration = testingUtility.getConfiguration();
+ private CacheConfig cacheConf = new CacheConfig(configuration);
+ private Random randomizer;
+ private int numberOfSeeks;
+
+ /** Use this benchmark with default options */
+ public EncodedSeekPerformanceTest() {
+ configuration.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.5f);
+ randomizer = new Random(42l);
+ numberOfSeeks = DEFAULT_NUMBER_OF_SEEKS;
+ }
+
+ private List<KeyValue> prepareListOfTestSeeks(Path path) throws IOException {
+ List<KeyValue> allKeyValues = new ArrayList<KeyValue>();
+
+ // read all of the key values
+ StoreFile storeFile = new StoreFile(testingUtility.getTestFileSystem(),
+ path, configuration, cacheConf, BloomType.NONE,
+ NoOpDataBlockEncoder.INSTANCE);
+
+ StoreFile.Reader reader = storeFile.createReader();
+ StoreFileScanner scanner = reader.getStoreFileScanner(true, false);
+ KeyValue current;
+
+ scanner.seek(KeyValue.LOWESTKEY);
+ while (null != (current = scanner.next())) {
+ allKeyValues.add(current);
+ }
+
+ storeFile.closeReader(cacheConf.shouldEvictOnClose());
+
+ // pick seeks by random
+ List<KeyValue> seeks = new ArrayList<KeyValue>();
+ for (int i = 0; i < numberOfSeeks; ++i) {
+ KeyValue keyValue = allKeyValues.get(
+ randomizer.nextInt(allKeyValues.size()));
+ seeks.add(keyValue);
+ }
+
+ clearBlockCache();
+
+ return seeks;
+ }
+
+ private void runTest(Path path, HFileDataBlockEncoder blockEncoder,
+ List<KeyValue> seeks) throws IOException {
+ // read all of the key values
+ StoreFile storeFile = new StoreFile(testingUtility.getTestFileSystem(),
+ path, configuration, cacheConf, BloomType.NONE, blockEncoder);
+
+ long totalSize = 0;
+
+ StoreFile.Reader reader = storeFile.createReader();
+ StoreFileScanner scanner = reader.getStoreFileScanner(true, false);
+
+ long startReadingTime = System.nanoTime();
+ KeyValue current;
+ scanner.seek(KeyValue.LOWESTKEY);
+ while (null != (current = scanner.next())) { // just iterate it!
+ if (current.getLength() < 0) {
+ throw new IOException("Negative KV size: " + current);
+ }
+ totalSize += current.getLength();
+ }
+ long finishReadingTime = System.nanoTime();
+
+ // do seeks
+ long startSeeksTime = System.nanoTime();
+ for (KeyValue keyValue : seeks) {
+ scanner.seek(keyValue);
+ KeyValue toVerify = scanner.next();
+ if (!keyValue.equals(toVerify)) {
+ System.out.println(String.format("KeyValue doesn't match:\n" +
+ "Orig key: %s\n" +
+ "Ret key: %s", keyValue.getKeyString(), toVerify.getKeyString()));
+ break;
+ }
+ }
+ long finishSeeksTime = System.nanoTime();
+ if (finishSeeksTime < startSeeksTime) {
+ throw new AssertionError("Finish time " + finishSeeksTime +
+ " is earlier than start time " + startSeeksTime);
+ }
+
+ // write some stats
+ double readInMbPerSec = (totalSize * NANOSEC_IN_SEC) /
+ (BYTES_IN_MEGABYTES * (finishReadingTime - startReadingTime));
+ double seeksPerSec = (seeks.size() * NANOSEC_IN_SEC) /
+ (finishSeeksTime - startSeeksTime);
+
+ storeFile.closeReader(cacheConf.shouldEvictOnClose());
+ clearBlockCache();
+
+ System.out.println(blockEncoder);
+ System.out.printf(" Read speed: %8.2f (MB/s)\n", readInMbPerSec);
+ System.out.printf(" Seeks per second: %8.2f (#/s)\n", seeksPerSec);
+ System.out.printf(" Total KV size: %d\n", totalSize);
+ }
+
+ /**
+ * @param path Path to the HFile which will be used.
+ * @param encoders List of encoders which will be used for tests.
+ * @throws IOException if there is a bug while reading from disk
+ */
+ public void runTests(Path path, List<HFileDataBlockEncoder> encoders)
+ throws IOException {
+ List<KeyValue> seeks = prepareListOfTestSeeks(path);
+
+ for (HFileDataBlockEncoder blockEncoder : encoders) {
+ runTest(path, blockEncoder, seeks);
+ }
+ }
+
+ /**
+ * Command line interface:
+ * @param args Takes one argument - file size.
+ * @throws IOException if there is a bug while reading from disk
+ */
+ public static void main(final String[] args) throws IOException {
+ if (args.length < 1) {
+ printUsage();
+ System.exit(-1);
+ }
+
+ Path path = new Path(args[0]);
+ List<HFileDataBlockEncoder> encoders =
+ new ArrayList<HFileDataBlockEncoder>();
+
+ encoders.add(new HFileDataBlockEncoderImpl(DataBlockEncoding.NONE));
+ for (DataBlockEncoding encodingAlgo : DataBlockEncoding.values()) {
+ encoders.add(new HFileDataBlockEncoderImpl(DataBlockEncoding.NONE,
+ encodingAlgo));
+ }
+
+ EncodedSeekPerformanceTest utility = new EncodedSeekPerformanceTest();
+ utility.runTests(path, encoders);
+
+ System.exit(0);
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: one argument, name of the HFile");
+ }
+
+ private void clearBlockCache() {
+ ((LruBlockCache) cacheConf.getBlockCache()).clearCache();
+ }
+}