You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/12/24 22:20:41 UTC
svn commit: r1223020 [4/5] - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/
main/java/org/apache/hadoop/hbase/io/encoding/
main/java/org/apache/hadoop/hbase/io/hfile/
main/java/org/apache/hadoop/hbase/mapr...
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java Sat Dec 24 21:20:39 2011
@@ -35,8 +35,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
@@ -44,7 +46,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.Compressor;
/**
- * Writes version 1 HFiles. Mainly used for testing backwards-compatibilty.
+ * Writes version 1 HFiles. Mainly used for testing backwards-compatibility.
*/
public class HFileWriterV1 extends AbstractHFileWriter {
@@ -91,16 +93,17 @@ public class HFileWriterV1 extends Abstr
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
- Compression.Algorithm compressAlgo, final KeyComparator comparator)
+ Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder,
+ KeyComparator comparator)
throws IOException {
return new HFileWriterV1(conf, cacheConf, fs, path, blockSize,
- compressAlgo, comparator);
+ compressAlgo, dataBlockEncoder, comparator);
}
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
- String compressAlgoName,
- final KeyComparator comparator) throws IOException {
+ String compressAlgoName, KeyComparator comparator)
+ throws IOException {
return new HFileWriterV1(conf, cacheConf, fs, path, blockSize,
compressAlgoName, comparator);
}
@@ -117,7 +120,8 @@ public class HFileWriterV1 extends Abstr
public Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final Compression.Algorithm compress,
final KeyComparator c) throws IOException {
- return new HFileWriterV1(cacheConf, ostream, blockSize, compress, c);
+ return new HFileWriterV1(cacheConf, ostream, blockSize, compress,
+ new NoOpDataBlockEncoder(), c);
}
}
@@ -127,7 +131,7 @@ public class HFileWriterV1 extends Abstr
throws IOException {
this(conf, cacheConf, fs, path, HFile.DEFAULT_BLOCKSIZE,
HFile.DEFAULT_COMPRESSION_ALGORITHM,
- null);
+ new NoOpDataBlockEncoder(), null);
}
/**
@@ -138,15 +142,18 @@ public class HFileWriterV1 extends Abstr
Path path, int blockSize, String compressAlgoName,
final KeyComparator comparator) throws IOException {
this(conf, cacheConf, fs, path, blockSize,
- compressionByName(compressAlgoName), comparator);
+ compressionByName(compressAlgoName), new NoOpDataBlockEncoder(),
+ comparator);
}
/** Constructor that takes a path, creates and closes the output stream. */
- public HFileWriterV1(Configuration conf, CacheConfig cacheConf, FileSystem fs,
- Path path, int blockSize, Compression.Algorithm compress,
+ public HFileWriterV1(Configuration conf, CacheConfig cacheConf,
+ FileSystem fs, Path path,
+ int blockSize, Compression.Algorithm compress,
+ HFileDataBlockEncoder blockEncoder,
final KeyComparator comparator) throws IOException {
super(cacheConf, createOutputStream(conf, fs, path), path,
- blockSize, compress, comparator);
+ blockSize, compress, blockEncoder, comparator);
SchemaMetrics.configureGlobally(conf);
}
@@ -157,15 +164,17 @@ public class HFileWriterV1 extends Abstr
throws IOException {
this(cacheConf, outputStream, blockSize,
Compression.getCompressionAlgorithmByName(compressAlgoName),
- comparator);
+ new NoOpDataBlockEncoder(), comparator);
}
/** Constructor that takes a stream. */
public HFileWriterV1(CacheConfig cacheConf,
final FSDataOutputStream outputStream, final int blockSize,
- final Compression.Algorithm compress, final KeyComparator comparator)
+ final Compression.Algorithm compress,
+ HFileDataBlockEncoder blockEncoder, final KeyComparator comparator)
throws IOException {
- super(cacheConf, outputStream, null, blockSize, compress, comparator);
+ super(cacheConf, outputStream, null, blockSize, compress,
+ blockEncoder, comparator);
}
/**
@@ -202,13 +211,17 @@ public class HFileWriterV1 extends Abstr
if (cacheConf.shouldCacheDataOnWrite()) {
baosDos.flush();
+ // we do not do dataBlockEncoding on disk HFile V2.
byte[] bytes = baos.toByteArray();
HFileBlock cBlock = new HFileBlock(BlockType.DATA,
(int) (outputStream.getPos() - blockBegin), bytes.length, -1,
- ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin);
- passSchemaMetricsTo(cBlock);
+ ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER,
+ blockBegin, MemStore.NO_PERSISTENT_TS);
+ HFileBlock codedBlock = blockEncoder.beforeBlockCache(cBlock,
+ false);
+ passSchemaMetricsTo(codedBlock);
cacheConf.getBlockCache().cacheBlock(
- HFile.getBlockCacheKey(name, blockBegin), cBlock);
+ HFile.getBlockCacheKey(name, blockBegin), codedBlock);
baosDos.close();
}
blockNumber++;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Sat Dec 24 21:20:39 2011
@@ -49,9 +49,13 @@ public class HFileWriterV2 extends Abstr
static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
/** Max memstore (mvcc) timestamp in FileInfo */
- public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
+ public static final byte [] MAX_MEMSTORE_TS_KEY =
+ Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
+
/** KeyValue version in FileInfo */
- public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
+ public static final byte [] KEY_VALUE_VERSION =
+ Bytes.toBytes("KEY_VALUE_VERSION");
+
/** Version for KeyValue which includes memstore timestamp */
public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
@@ -92,10 +96,10 @@ public class HFileWriterV2 extends Abstr
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
- Compression.Algorithm compress,
+ Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder,
final KeyComparator comparator) throws IOException {
return new HFileWriterV2(conf, cacheConf, fs, path, blockSize,
- compress, comparator);
+ compress, blockEncoder, comparator);
}
@Override
@@ -128,7 +132,7 @@ public class HFileWriterV2 extends Abstr
FileSystem fs, Path path)
throws IOException {
this(conf, cacheConf, fs, path, HFile.DEFAULT_BLOCKSIZE,
- HFile.DEFAULT_COMPRESSION_ALGORITHM, null);
+ HFile.DEFAULT_COMPRESSION_ALGORITHM, null, null);
}
/**
@@ -139,15 +143,16 @@ public class HFileWriterV2 extends Abstr
Path path, int blockSize, String compressAlgoName,
final KeyComparator comparator) throws IOException {
this(conf, cacheConf, fs, path, blockSize,
- compressionByName(compressAlgoName), comparator);
+ compressionByName(compressAlgoName), null, comparator);
}
/** Constructor that takes a path, creates and closes the output stream. */
public HFileWriterV2(Configuration conf, CacheConfig cacheConf, FileSystem fs,
Path path, int blockSize, Compression.Algorithm compressAlgo,
+ HFileDataBlockEncoder blockEncoder,
final KeyComparator comparator) throws IOException {
super(cacheConf, createOutputStream(conf, fs, path), path,
- blockSize, compressAlgo, comparator);
+ blockSize, compressAlgo, blockEncoder, comparator);
SchemaMetrics.configureGlobally(conf);
finishInit(conf);
}
@@ -167,7 +172,8 @@ public class HFileWriterV2 extends Abstr
final FSDataOutputStream outputStream, final int blockSize,
final Compression.Algorithm compress, final KeyComparator comparator)
throws IOException {
- super(cacheConf, outputStream, null, blockSize, compress, comparator);
+ super(cacheConf, outputStream, null, blockSize, compress, null,
+ comparator);
finishInit(conf);
}
@@ -177,7 +183,8 @@ public class HFileWriterV2 extends Abstr
throw new IllegalStateException("finishInit called twice");
// HFile filesystem-level (non-caching) block writer
- fsBlockWriter = new HFileBlock.Writer(compressAlgo);
+ fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder,
+ includeMemstoreTS);
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
@@ -225,8 +232,9 @@ public class HFileWriterV2 extends Abstr
long startTimeNs = System.nanoTime();
// Update the first data block offset for scanning.
- if (firstDataBlockOffset == -1)
+ if (firstDataBlockOffset == -1) {
firstDataBlockOffset = outputStream.getPos();
+ }
// Update the last data block offset
lastDataBlockOffset = outputStream.getPos();
@@ -242,10 +250,12 @@ public class HFileWriterV2 extends Abstr
HFile.writeOps.incrementAndGet();
if (cacheConf.shouldCacheDataOnWrite()) {
- HFileBlock blockForCaching = fsBlockWriter.getBlockForCaching();
- passSchemaMetricsTo(blockForCaching);
+ HFileBlock cBlock = fsBlockWriter.getBlockForCaching();
+ HFileBlock codedBlock = blockEncoder.beforeBlockCache(cBlock,
+ includeMemstoreTS);
+ passSchemaMetricsTo(codedBlock);
cacheConf.getBlockCache().cacheBlock(
- HFile.getBlockCacheKey(name, lastDataBlockOffset), blockForCaching);
+ HFile.getBlockCacheKey(name, lastDataBlockOffset), codedBlock);
}
}
@@ -256,7 +266,7 @@ public class HFileWriterV2 extends Abstr
long offset = outputStream.getPos();
boolean cacheThisBlock = ibw.cacheOnWrite();
ibw.writeInlineBlock(fsBlockWriter.startWriting(
- ibw.getInlineBlockType(), cacheThisBlock));
+ ibw.getInlineBlockType()));
fsBlockWriter.writeHeaderAndData(outputStream);
ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
fsBlockWriter.getUncompressedSizeWithoutHeader());
@@ -265,9 +275,11 @@ public class HFileWriterV2 extends Abstr
if (cacheThisBlock) {
// Cache this block on write.
HFileBlock cBlock = fsBlockWriter.getBlockForCaching();
- passSchemaMetricsTo(cBlock);
+ HFileBlock codedBlock = blockEncoder.beforeBlockCache(cBlock,
+ includeMemstoreTS);
+ passSchemaMetricsTo(codedBlock);
cacheConf.getBlockCache().cacheBlock(
- HFile.getBlockCacheKey(name, offset), cBlock);
+ HFile.getBlockCacheKey(name, offset), codedBlock);
}
}
}
@@ -280,8 +292,7 @@ public class HFileWriterV2 extends Abstr
*/
private void newBlock() throws IOException {
// This is where the next block begins.
- fsBlockWriter.startWriting(BlockType.DATA,
- cacheConf.shouldCacheDataOnWrite());
+ fsBlockWriter.startWriting(BlockType.DATA);
firstKeyInBlock = null;
}
@@ -413,8 +424,7 @@ public class HFileWriterV2 extends Abstr
// store the beginning offset
long offset = outputStream.getPos();
// write the metadata content
- DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META,
- cacheConf.shouldCacheDataOnWrite());
+ DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
metaData.get(i).write(dos);
fsBlockWriter.writeHeaderAndData(outputStream);
@@ -440,7 +450,7 @@ public class HFileWriterV2 extends Abstr
// Meta block index.
metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
- BlockType.ROOT_INDEX, false), "meta");
+ BlockType.ROOT_INDEX), "meta");
fsBlockWriter.writeHeaderAndData(outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
@@ -450,8 +460,7 @@ public class HFileWriterV2 extends Abstr
}
// File info
- writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO,
- false));
+ writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
fsBlockWriter.writeHeaderAndData(outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Sat Dec 24 21:20:39 2011
@@ -722,4 +722,10 @@ public class LruBlockCache implements Bl
public void shutdown() {
this.scheduleThreadPool.shutdown();
}
+
+ /** Clears the cache. Used in tests. */
+ public void clearCache() {
+ map.clear();
+ }
+
}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Does not perform any kind of encoding/decoding.
+ */
+public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
+
+ @Override
+ public HFileBlock afterReadFromDisk(HFileBlock block) {
+ if (block.getBlockType() == BlockType.ENCODED_DATA) {
+ throw new IllegalStateException("Unexpected encoded block");
+ }
+ return block;
+ }
+
+ @Override
+ public HFileBlock afterReadFromDiskAndPuttingInCache(HFileBlock block,
+ boolean isCompaction, boolean includesMemstoreTS) {
+ return block;
+ }
+
+ @Override
+ public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
+ ByteBuffer in, boolean includesMemstoreTS) {
+ return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
+ }
+
+ @Override
+ public HFileBlock beforeBlockCache(HFileBlock block,
+ boolean includesMemstoreTS) {
+ return block;
+ }
+
+ @Override
+ public HFileBlock afterBlockCache(HFileBlock block, boolean isCompaction,
+ boolean includesMemstoreTS) {
+ return block;
+ }
+
+ @Override
+ public boolean useEncodedScanner(boolean isCompaction) {
+ return false;
+ }
+
+ @Override
+ public void saveMetadata(StoreFile.Writer storeFileWriter) {
+ }
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Sat Dec 24 21:20:39 2011
@@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.io.Refere
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -528,9 +530,13 @@ public class LoadIncrementalHFiles exten
CacheConfig cacheConf = new CacheConfig(conf);
HalfStoreFileReader halfReader = null;
StoreFile.Writer halfWriter = null;
+ HFileDataBlockEncoder dataBlockEncoder = new HFileDataBlockEncoderImpl(
+ familyDescriptor.getDataBlockEncodingOnDisk(),
+ familyDescriptor.getDataBlockEncodingInCache(),
+ familyDescriptor.useEncodedDataBlockSeek());
try {
halfReader = new HalfStoreFileReader(fs, inFile, cacheConf,
- reference);
+ reference, dataBlockEncoder);
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
int blocksize = familyDescriptor.getBlocksize();
@@ -538,7 +544,8 @@ public class LoadIncrementalHFiles exten
BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
halfWriter = new StoreFile.Writer(
- fs, outFile, blocksize, compression, conf, cacheConf,
+ fs, outFile, blocksize, compression, dataBlockEncoder,
+ conf, cacheConf,
KeyValue.COMPARATOR, bloomFilterType, 0);
HFileScanner scanner = halfReader.getScanner(false, false, false);
scanner.seekTo();
@@ -638,7 +645,6 @@ public class LoadIncrementalHFiles exten
Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
for (Path hfile : hfiles) {
if (hfile.getName().startsWith("_")) continue;
-
HFile.Reader reader = HFile.createReader(fs, hfile,
new CacheConfig(getConf()));
final byte[] first, last;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sat Dec 24 21:20:39 2011
@@ -870,6 +870,10 @@ public class MemStore implements HeapSiz
ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
(2 * ClassSize.CONCURRENT_SKIPLISTMAP));
+ // Constants for whether to serialize memstore timestamp.
+ public static final boolean NO_PERSISTENT_TS = false;
+ public static final boolean PERSISTENT_TS = true;
+
/*
* Calculate how the MemStore size has changed. Includes overhead of the
* backing Map.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Dec 24 21:20:39 2011
@@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.client.Sc
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
@@ -144,6 +146,7 @@ public class Store extends SchemaConfigu
private final Compression.Algorithm compression;
/** Compression algorithm for major compaction */
private final Compression.Algorithm compactionCompression;
+ private HFileDataBlockEncoder dataBlockEncoder;
// Comparing KeyValues
final KeyValue.KVComparator comparator;
@@ -181,6 +184,12 @@ public class Store extends SchemaConfigu
this.compactionCompression =
(family.getCompactionCompression() != Compression.Algorithm.NONE) ?
family.getCompactionCompression() : this.compression;
+
+ this.dataBlockEncoder =
+ new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
+ family.getDataBlockEncodingInCache(),
+ family.useEncodedDataBlockSeek());
+
this.comparator = info.getComparator();
// getTimeToLive returns ttl in seconds. Convert to milliseconds.
this.ttl = family.getTimeToLive();
@@ -270,6 +279,21 @@ public class Store extends SchemaConfigu
public Path getHomedir() {
return homedir;
}
+
+ /**
+ * @return the data block encoder
+ */
+ public HFileDataBlockEncoder getDataBlockEncoder() {
+ return dataBlockEncoder;
+ }
+
+ /**
+ * Should be used only in tests.
+ * @param blockEncoder the block delta encoder to use
+ */
+ public void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
+ this.dataBlockEncoder = blockEncoder;
+ }
/*
* Creates an unsorted list of StoreFile loaded from the given directory.
@@ -292,8 +316,9 @@ public class Store extends SchemaConfigu
continue;
}
StoreFile curfile = new StoreFile(fs, p, this.conf, this.cacheConf,
- this.family.getBloomFilterType());
+ this.family.getBloomFilterType(), this.dataBlockEncoder);
passSchemaMetricsTo(curfile);
+
curfile.createReader();
long length = curfile.getReader().length();
this.storeSize += length;
@@ -447,8 +472,9 @@ public class Store extends SchemaConfigu
StoreFile.rename(fs, srcPath, dstPath);
StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType());
+ this.family.getBloomFilterType(), this.dataBlockEncoder);
passSchemaMetricsTo(sf);
+
sf.createReader();
LOG.info("Moved hfile " + srcPath + " into store directory " +
@@ -555,7 +581,6 @@ public class Store extends SchemaConfigu
MonitoredTask status)
throws IOException {
StoreFile.Writer writer;
- String fileName;
// Find the smallest read point across all the Scanners.
long smallestReadPoint = region.getSmallestReadPoint();
long flushed = 0;
@@ -651,8 +676,9 @@ public class Store extends SchemaConfigu
status.setStatus("Flushing " + this + ": reopening flushed file");
StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType());
+ this.family.getBloomFilterType(), this.dataBlockEncoder);
passSchemaMetricsTo(sf);
+
StoreFile.Reader r = sf.createReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
@@ -690,7 +716,7 @@ public class Store extends SchemaConfigu
Compression.Algorithm compression)
throws IOException {
StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(),
- blocksize, compression, comparator, conf, cacheConf,
+ blocksize, compression, dataBlockEncoder, comparator, conf, cacheConf,
family.getBloomFilterType(), maxKeyCount);
// The store file writer's path does not include the CF name, so we need
// to configure the HFile writer directly.
@@ -1416,7 +1442,7 @@ public class Store extends SchemaConfigu
StoreFile storeFile = null;
try {
storeFile = new StoreFile(this.fs, path, this.conf,
- this.cacheConf, this.family.getBloomFilterType());
+ this.cacheConf, this.family.getBloomFilterType(), null);
passSchemaMetricsTo(storeFile);
storeFile.createReader();
} catch (IOException e) {
@@ -1468,7 +1494,7 @@ public class Store extends SchemaConfigu
" to " + destPath);
}
result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType());
+ this.family.getBloomFilterType(), this.dataBlockEncoder);
passSchemaMetricsTo(result);
result.createReader();
}
@@ -2056,8 +2082,8 @@ public class Store extends SchemaConfigu
}
public static final long FIXED_OVERHEAD =
- ClassSize.align(new SchemaConfigured().heapSize()
- + (18 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
+ ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
+ + (19 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
+ (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sat Dec 24 21:20:39 2011
@@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
@@ -130,6 +132,10 @@ public class StoreFile extends SchemaCon
/** Key for timestamp of earliest-put in metadata*/
public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
+ /** Type of encoding used for data blocks in HFile. Stored in file info. */
+ public static final byte[] DATA_BLOCK_ENCODING =
+ Bytes.toBytes("DATA_BLOCK_ENCODING");
+
// Make default block size for StoreFiles 8k while testing. TODO: FIX!
// Need to make it 8k for testing.
public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
@@ -148,7 +154,10 @@ public class StoreFile extends SchemaCon
// Block cache configuration and reference.
private final CacheConfig cacheConf;
- // HDFS blocks distribuion information
+ // What kind of data block encoding will be used
+ private HFileDataBlockEncoder dataBlockEncoder;
+
+ // HDFS blocks distribution information
private HDFSBlocksDistribution hdfsBlocksDistribution;
// Keys for metadata stored in backing HFile.
@@ -207,6 +216,23 @@ public class StoreFile extends SchemaCon
private long modificationTimeStamp = 0L;
/**
+ * Ignore bloom filters, don't use option inMemory
+ * and dataBlockEncoding in memory.
+ * @param fs The current file system to use
+ * @param p The path of the file.
+ * @param conf The current configuration.
+ * @throws IOException When opening the reader fails.
+ */
+ StoreFile(final FileSystem fs,
+ final Path p,
+ final Configuration conf,
+ final CacheConfig cacheConf)
+ throws IOException {
+ this(fs, p, conf, cacheConf, BloomType.NONE,
+ new NoOpDataBlockEncoder());
+ }
+
+ /**
* Constructor, loads a reader and it's indices, etc. May allocate a
* substantial amount of ram depending on the underlying files (10-20MB?).
*
@@ -220,17 +246,20 @@ public class StoreFile extends SchemaCon
* as the Bloom filter type actually present in the HFile, because
* column family configuration might change. If this is
* {@link BloomType#NONE}, the existing Bloom filter is ignored.
+ * @param dataBlockEncoder data block encoding algorithm.
* @throws IOException When opening the reader fails.
*/
StoreFile(final FileSystem fs,
final Path p,
final Configuration conf,
final CacheConfig cacheConf,
- final BloomType cfBloomType)
+ final BloomType cfBloomType,
+ final HFileDataBlockEncoder dataBlockEncoder)
throws IOException {
this.fs = fs;
this.path = p;
this.cacheConf = cacheConf;
+ this.dataBlockEncoder = dataBlockEncoder;
if (isReference(p)) {
this.reference = Reference.read(fs, p);
this.referencePath = getReferredToFile(this.path);
@@ -493,9 +522,10 @@ public class StoreFile extends SchemaCon
}
if (isReference()) {
this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
- this.cacheConf, this.reference);
+ this.cacheConf, this.reference, this.dataBlockEncoder);
} else {
- this.reader = new Reader(this.fs, this.path, this.cacheConf);
+ this.reader = new Reader(this.fs, this.path, this.cacheConf,
+ this.dataBlockEncoder);
}
if (isSchemaConfigured()) {
@@ -677,8 +707,8 @@ public class StoreFile extends SchemaCon
public static Writer createWriter(final FileSystem fs, final Path dir,
final int blocksize, Configuration conf, CacheConfig cacheConf)
throws IOException {
- return createWriter(fs, dir, blocksize, null, null, conf, cacheConf,
- BloomType.NONE, 0);
+ return createWriter(fs, dir, blocksize, null, new NoOpDataBlockEncoder(),
+ null, conf, cacheConf, BloomType.NONE, 0);
}
/**
@@ -689,6 +719,7 @@ public class StoreFile extends SchemaCon
* Creates a file with a unique name in this directory.
* @param blocksize
* @param algorithm Pass null to get default.
+ * @param dataBlockEncoder Pass null to disable data block encoding.
* @param c Pass null to get default.
* @param conf HBase system configuration. used with bloom filters
* @param cacheConf Cache configuration and reference.
@@ -701,6 +732,7 @@ public class StoreFile extends SchemaCon
final Path dir,
final int blocksize,
final Compression.Algorithm algorithm,
+ final HFileDataBlockEncoder dataBlockEncoder,
final KeyValue.KVComparator c,
final Configuration conf,
final CacheConfig cacheConf,
@@ -718,7 +750,7 @@ public class StoreFile extends SchemaCon
return new Writer(fs, path, blocksize,
algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
- conf, cacheConf, c == null ? KeyValue.COMPARATOR: c, bloomType,
+ dataBlockEncoder, conf, cacheConf, c == null ? KeyValue.COMPARATOR : c, bloomType,
maxKeyCount);
}
@@ -814,6 +846,8 @@ public class StoreFile extends SchemaCon
private KeyValue lastDeleteFamilyKV = null;
private long deleteFamilyCnt = 0;
+ protected HFileDataBlockEncoder dataBlockEncoder;
+
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
* When flushing a memstore, we set TimeRange and use this variable to
@@ -838,13 +872,16 @@ public class StoreFile extends SchemaCon
* @throws IOException problem writing to FS
*/
public Writer(FileSystem fs, Path path, int blocksize,
- Compression.Algorithm compress, final Configuration conf,
+ Compression.Algorithm compress,
+ HFileDataBlockEncoder dataBlockEncoder, final Configuration conf,
CacheConfig cacheConf,
final KVComparator comparator, BloomType bloomType, long maxKeys)
throws IOException {
+ this.dataBlockEncoder = dataBlockEncoder != null ?
+ dataBlockEncoder : new NoOpDataBlockEncoder();
writer = HFile.getWriterFactory(conf, cacheConf).createWriter(
fs, path, blocksize,
- compress, comparator.getRawComparator());
+ compress, this.dataBlockEncoder, comparator.getRawComparator());
this.kvComparator = comparator;
@@ -1081,6 +1118,10 @@ public class StoreFile extends SchemaCon
}
public void close() throws IOException {
+ // (optional) Add data block encoding used to save this file
+ // It is mostly for statistics and debugging purpose.
+ dataBlockEncoder.saveMetadata(this);
+
boolean hasGeneralBloom = this.closeGeneralBloomFilter();
boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
@@ -1119,10 +1160,12 @@ public class StoreFile extends SchemaCon
private byte[] lastBloomKey;
private long deleteFamilyCnt = -1;
- public Reader(FileSystem fs, Path path, CacheConfig cacheConf)
+ public Reader(FileSystem fs, Path path, CacheConfig cacheConf,
+ HFileDataBlockEncoder dataBlockEncoder)
throws IOException {
super(path);
- reader = HFile.createReader(fs, path, cacheConf);
+ reader = HFile.createReader(fs, path, cacheConf,
+ dataBlockEncoder);
bloomFilterType = BloomType.NONE;
}
@@ -1262,7 +1305,7 @@ public class StoreFile extends SchemaCon
default:
return true;
- }
+ }
}
public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
@@ -1312,7 +1355,7 @@ public class StoreFile extends SchemaCon
return true;
byte[] key;
- switch (bloomFilterType) {
+ switch (bloomFilterType) {
case ROW:
if (col != null) {
throw new RuntimeException("Row-only Bloom filter called with " +
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Sat Dec 24 21:20:39 2011
@@ -177,7 +177,7 @@ class StoreFileScanner implements KeyVal
realSeekDone = true;
}
} catch (IOException ioe) {
- throw new IOException("Could not seek " + this, ioe);
+ throw new IOException("Could not seek " + this + " " + key, ioe);
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java Sat Dec 24 21:20:39 2011
@@ -83,6 +83,15 @@ public class SchemaConfigured implements
}
/**
+ * Creates an instance corresponding to an unknown table and column family.
+ * Used in unit tests.
+ */
+ public static SchemaConfigured createUnknown() {
+ return new SchemaConfigured(null, SchemaMetrics.UNKNOWN,
+ SchemaMetrics.UNKNOWN);
+ }
+
+ /**
* Default constructor. Only use when column/family name are not known at
* construction (i.e. for HFile blocks).
*/
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.encoding.
+ EncoderBufferTooSmallException;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Utility functions for working with byte buffers, such as reading/writing
+ * variable-length long numbers.
+ */
+public final class ByteBufferUtils {
+
+ // "Compressed integer" serialization helper constants.
+ private final static int VALUE_MASK = 0x7f;
+ private final static int NEXT_BIT_SHIFT = 7;
+ private final static int NEXT_BIT_MASK = 1 << 7;
+
+ private ByteBufferUtils() {
+ }
+
+ /**
+ * Similar to {@link WritableUtils#writeVLong(java.io.DataOutput, long)},
+ * but writes to a {@link ByteBuffer}.
+ */
+ public static void writeVLong(ByteBuffer out, long i) {
+ if (i >= -112 && i <= 127) {
+ out.put((byte) i);
+ return;
+ }
+
+ int len = -112;
+ if (i < 0) {
+ i ^= -1L; // take one's complement'
+ len = -120;
+ }
+
+ long tmp = i;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
+
+ out.put((byte) len);
+
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+
+ for (int idx = len; idx != 0; idx--) {
+ int shiftbits = (idx - 1) * 8;
+ long mask = 0xFFL << shiftbits;
+ out.put((byte) ((i & mask) >> shiftbits));
+ }
+ }
+
+ /**
+ * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
+ * {@link ByteBuffer}.
+ */
+ public static long readVLong(ByteBuffer in) {
+ byte firstByte = in.get();
+ int len = WritableUtils.decodeVIntSize(firstByte);
+ if (len == 1) {
+ return firstByte;
+ }
+ long i = 0;
+ for (int idx = 0; idx < len-1; idx++) {
+ byte b = in.get();
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+ }
+
+
+ /**
+ * Put in buffer integer using 7 bit encoding. For each written byte:
+ * 7 bits are used to store value
+ * 1 bit is used to indicate whether there is next bit.
+ * @param value Int to be compressed.
+ * @param out Where to put compressed data
+ * @return Number of bytes written.
+ * @throws IOException on stream error
+ */
+ public static int putCompressedInt(OutputStream out, final int value)
+ throws IOException {
+ int i = 0;
+ int tmpvalue = value;
+ do {
+ byte b = (byte) (tmpvalue & VALUE_MASK);
+ tmpvalue >>>= NEXT_BIT_SHIFT;
+ if (tmpvalue != 0) {
+ b |= (byte) NEXT_BIT_MASK;
+ }
+ out.write(b);
+ i++;
+ } while (tmpvalue != 0);
+ return i;
+ }
+
+ /**
+ * Put in output stream 32 bit integer (Big Endian byte order).
+ * @param out Where to put integer.
+ * @param value Value of integer.
+ * @throws IOException On stream error.
+ */
+ public static void putInt(OutputStream out, final int value)
+ throws IOException {
+ for (int i = Bytes.SIZEOF_INT - 1 ; i >= 0 ; --i) {
+ out.write((byte) (value >>> (i * 8)));
+ }
+ }
+
+ /**
+ * Copy byte to the output stream.
+ * @param b byte to be copied
+ * @param out Where to put compressed data
+ * @return Number of written bytes.
+ * @throws IOException on stream error
+ */
+ public static int copyToStream(OutputStream out, byte b)
+ throws IOException {
+ out.write(b);
+ return Bytes.SIZEOF_BYTE;
+ }
+
+ /**
+ * Copy the data to the output stream and update position in buffer.
+ * @param out Write bytes here.
+ * @param buffer Source buffer in certain position.
+ * @param length Length of copy.
+ * @return Number of written bytes.
+ * @throws IOException on stream error
+ */
+ public static int copyToStream(OutputStream out, ByteBuffer buffer,
+ int length) throws IOException {
+ if (buffer.hasArray()) {
+ out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ length);
+ skip(buffer, length);
+ } else {
+ for (int i = 0 ; i < length ; ++i) {
+ out.write(buffer.get());
+ }
+ }
+ return length;
+ }
+
+ /**
+ * Copy the data to the output stream
+ * @param out Write bytes here.
+ * @param in Source buffer in certain position.
+ * @param offset In the buffer from position 0.
+ * @param length Length of copy.
+ * @return Number of written bytes.
+ * @throws IOException on stream error
+ */
+ public static int copyToStream(OutputStream out, ByteBuffer in,
+ int offset, int length) throws IOException {
+ if (in.hasArray()) {
+ out.write(in.array(), in.arrayOffset() + offset,
+ length);
+ } else {
+ for (int i = 0 ; i < length ; ++i) {
+ out.write(in.get(offset + i));
+ }
+ }
+ return length;
+ }
+
+ public static int putLong(OutputStream out, final long length,
+ final int fitInBytes) throws IOException {
+ long tmpLength = length;
+ for (int i = 0 ; i < fitInBytes ; ++i) {
+ out.write((byte) (tmpLength & 0xff));
+ tmpLength >>>= 8;
+ }
+ return fitInBytes;
+ }
+
+ /**
+ * Check how many bytes are required to store value.
+ * @param value Value which size will be tested.
+ * @return How many bytes are required to store value.
+ */
+ public static int longFitsIn(final long value) {
+ if (value < 0) {
+ return 8;
+ }
+
+ if (value < (1l << 4 * 8)) {
+ // no more than 4 bytes
+ if (value < (1l << 2 * 8)) {
+ if (value < (1l << 1 * 8)) {
+ return 1;
+ }
+ return 2;
+ }
+ if (value < (1l << 3 * 8)) {
+ return 3;
+ }
+ return 4;
+ }
+ // more than 4 bytes
+ if (value < (1l << 6 * 8)) {
+ if (value < (1l << 5 * 8)) {
+ return 5;
+ }
+ return 6;
+ }
+ if (value < (1l << 7 * 8)) {
+ return 7;
+ }
+ return 8;
+ }
+
+ /**
+ * Check how many bytes is required to store value.
+ * @param value Value which size will be tested.
+ * @return How many bytes are required to store value.
+ */
+ public static int intFitsIn(final int value) {
+ if (value < 0) {
+ return 4;
+ }
+
+ if (value < (1 << 2 * 8)) {
+ if (value < (1 << 1 * 8)) {
+ return 1;
+ }
+ return 2;
+ }
+ if (value <= (1 << 3 * 8)) {
+ return 3;
+ }
+ return 4;
+ }
+
+ /**
+ * Read integer from stream coded in 7 bits and increment position.
+ * @return Read integer.
+ * @throws IOException
+ */
+ public static int readCompressedInt(InputStream input)
+ throws IOException {
+ int result = 0;
+ int i = 0;
+ byte b;
+ do {
+ b = (byte) input.read();
+ result += (b & VALUE_MASK) << (NEXT_BIT_SHIFT * i);
+ i++;
+ if (i > Bytes.SIZEOF_INT + 1) {
+ throw new IllegalStateException(
+ "Corrupted compressed int (too long: " + (i + 1) + " bytes)");
+ }
+ } while (0 != (b & NEXT_BIT_MASK));
+ return result;
+ }
+
+ /**
+ * Read integer from buffer coded in 7 bits and increment position.
+ * @return Read integer.
+ */
+ public static int readCompressedInt(ByteBuffer buffer) {
+ byte b = buffer.get();
+ if ((b & NEXT_BIT_MASK) != 0) {
+ return (b & VALUE_MASK) + (readCompressedInt(buffer) << NEXT_BIT_SHIFT);
+ }
+ return b & VALUE_MASK;
+ }
+
+ /**
+ * Read long which was written to fitInBytes bytes and increment position.
+ * @param fitInBytes In how many bytes given long is stored.
+ * @return The value of parsed long.
+ * @throws IOException
+ */
+ public static long readLong(InputStream input, final int fitInBytes)
+ throws IOException {
+ long tmpLong = 0;
+ for (int i = 0 ; i < fitInBytes ; ++i) {
+ tmpLong |= (input.read() & 0xffl) << (8 * i);
+ }
+ return tmpLong;
+ }
+
+ /**
+ * Read long which was written to fitInBytes bytes and increment position.
+ * @param fitInBytes In how many bytes given long is stored.
+ * @return The value of parsed long.
+ */
+ public static long readLong(ByteBuffer buffer, final int fitInBytes) {
+ long tmpLength = 0;
+ for (int i = 0 ; i < fitInBytes ; ++i) {
+ tmpLength |= (buffer.get() & 0xffl) << (8l * i);
+ }
+ return tmpLength;
+ }
+
+ /**
+ * Asserts that we have 'length' bytes remaining in 'buffer'.
+ * @param buffer Where are we looking for remaining bytes.
+ * @param length How many bytes do we need.
+ * @throws EncoderBufferTooSmallException If there are no enough bytes.
+ */
+ public static void ensureSpace(ByteBuffer buffer, int length)
+ throws EncoderBufferTooSmallException {
+ if (buffer.position() + length > buffer.limit()) {
+ throw new EncoderBufferTooSmallException(
+ "Buffer position=" + buffer.position() +
+ ", buffer limit=" + buffer.limit() +
+ ", length to be written=" + length);
+ }
+ }
+
+ /**
+ * Copy 'length' bytes from 'source' and put it at the current position of
+ * 'buffer'. Update position in 'buffer' afterwards.
+ * @param source From where data should be read.
+ * @param buffer Write data here.
+ * @param length Read that many bytes.
+ * @throws IOException If there is problem in source.
+ */
+ public static void copyFromStream(DataInputStream source,
+ ByteBuffer buffer, int length) throws IOException {
+ if (buffer.hasArray()) {
+ source.readFully(buffer.array(), buffer.position() + buffer.arrayOffset(),
+ length);
+ skip(buffer, length);
+ } else {
+ for (int i = 0 ; i < length ; ++i) {
+ buffer.put(source.readByte());
+ }
+ }
+ }
+
+ /**
+ * Copy from one buffer to another from given offset
+ * @param source From where copy.
+ * @param destination Where to copy.
+ * @param sourceOffset Offset in the source buffer
+ * @param length How many bytes will be copied.
+ * @throws IOException
+ */
+ public static void copyFromBuffer(ByteBuffer source,
+ ByteBuffer destination, int sourceOffset, int length) {
+ if (source.hasArray() && destination.hasArray()) {
+ System.arraycopy(source.array(), sourceOffset + source.arrayOffset(),
+ destination.array(), destination.position() +
+ destination.arrayOffset(), length);
+ skip(destination, length);
+ } else {
+ for (int i = 0 ; i < length ; ++i) {
+ destination.put(source.get(sourceOffset + i));
+ }
+ }
+ }
+
+ /**
+ * Find length of common prefix of two parts in the buffer
+ * @param buffer Where parts are located.
+ * @param offsetLeft Offset of the first part.
+ * @param offsetRight Offset of the second part.
+ * @param limit Maximal length of common prefix.
+ * @return Length of prefix.
+ */
+ public static int findCommonPrefix(ByteBuffer buffer, int offsetLeft,
+ int offsetRight, int limit) {
+ int prefix = 0;
+
+ for (; prefix < limit ; ++prefix) {
+ if (buffer.get(offsetLeft + prefix) != buffer.get(offsetRight + prefix)) {
+ break;
+ }
+ }
+
+ return prefix;
+ }
+
+ /**
+ * Find length of common prefix in two arrays.
+ * @param left Array to be compared.
+ * @param leftOffset Offset in left array.
+ * @param leftLength Length of left array.
+ * @param right Array to be compared.
+ * @param rightArray Offset in right array.
+ * @param rightLength Length of right array.
+ */
+ public static int findCommonPrefix(
+ byte[] left, int leftOffset, int leftLength,
+ byte[] right, int rightOffset, int rightLength) {
+ int length = Math.min(leftLength, rightLength);
+ int result = 0;
+
+ while (result < length &&
+ left[leftOffset + result] == right[rightOffset + result]) {
+ result++;
+ }
+
+ return result;
+ }
+
+ /**
+ * Check whether two parts in the same buffer are equal.
+ * @param buffer In which buffer there are parts
+ * @param offsetLeft Beginning of first part.
+ * @param lengthLeft Length of the first part.
+ * @param offsetRight Beginning of the second part.
+ * @param lengthRight Length of the second part.
+ * @return
+ */
+ public static boolean arePartsEqual(ByteBuffer buffer,
+ int offsetLeft, int lengthLeft,
+ int offsetRight, int lengthRight) {
+ if (lengthLeft != lengthRight) {
+ return false;
+ }
+
+ if (buffer.hasArray()) {
+ return 0 == Bytes.compareTo(
+ buffer.array(), buffer.arrayOffset() + offsetLeft, lengthLeft,
+ buffer.array(), buffer.arrayOffset() + offsetRight, lengthRight);
+ }
+
+ for (int i = 0 ; i < lengthRight ; ++i) {
+ if (buffer.get(offsetLeft + i) != buffer.get(offsetRight + i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Increment position in buffer.
+ * @param buffer In this buffer.
+ * @param length By that many bytes.
+ */
+ public static void skip(ByteBuffer buffer, int length) {
+ buffer.position(buffer.position() + length);
+ }
+
+ /**
+ * Read int, assuming it is stored in N bytes with no special encoding.
+ * @param source From where read bytes.
+ * @param intLength How long is the integer
+ * @return The value of the integer.
+ * @throws IOException On IO error.
+ */
+ public static int readCompressedInt(InputStream source, int intLength)
+ throws IOException {
+ int result = 0;
+ for (int i = 0 ; i < intLength ; ++i) {
+ result = (result << 8) + (source.read() & 0xff);
+ }
+ return result;
+ }
+
+ /**
+ * Read int, assuming it is stored in N bytes with no special encoding.
+ * @param buffer Read bytes from this buffer.
+ * @param intLength The lenght of the integer in bytes.
+ * @return The value of the integer.
+ */
+ public static int readCompressedInt(ByteBuffer buffer, int intLength) {
+ int result = 0;
+ for (int i = 0 ; i < intLength ; ++i) {
+ result = (result << 8) + (buffer.get() & 0xff);
+ }
+ return result;
+ }
+
+}
Modified: hbase/trunk/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/admin.rb?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/admin.rb (original)
+++ hbase/trunk/src/main/ruby/hbase/admin.rb Sat Dec 24 21:20:39 2011
@@ -532,6 +532,9 @@ module Hbase
family.setInMemory(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY)
family.setTimeToLive(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::TTL])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL)
family.setCompressionType(org.apache.hadoop.hbase.io.hfile.Compression::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION)
+ family.setDataBlockEncodingOnDisk(org.apache.hadoop.hbase.io.encoding.DataBlockEncodingAlgorithms::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING_ON_DISK])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING_ON_DISK)
+ family.setDataBlockEncodingInCache(org.apache.hadoop.hbase.io.encoding.DataBlockEncodingAlgorithms::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING_IN_CACHE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING_IN_CACHE)
+ family.setEncodedDataBlockSeek(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::ENCODED_DATA_BLOCK_SEEK])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCODED_DATA_BLOCK_SEEK)
family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
family.setMinVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java Sat Dec 24 21:20:39 2011
@@ -221,18 +221,33 @@ public abstract class HBaseTestCase exte
final int minVersions, final int versions, final int ttl, boolean keepDeleted) {
HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(fam1, minVersions, versions,
- keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
- HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
- HColumnDescriptor.DEFAULT_BLOOMFILTER,
- HConstants.REPLICATION_SCOPE_LOCAL));
+ keepDeleted,
+ HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_ON_DISK,
+ HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE,
+ HColumnDescriptor.DEFAULT_ENCODED_DATA_BLOCK_SEEK,
+ false, false,
+ HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
+ HColumnDescriptor.DEFAULT_BLOOMFILTER,
+ HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam2, minVersions, versions,
- keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
+ keepDeleted,
+ HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_ON_DISK,
+ HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE,
+ HColumnDescriptor.DEFAULT_ENCODED_DATA_BLOCK_SEEK,
+ false, false,
HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam3, minVersions, versions,
- keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
- HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
+ keepDeleted,
+ HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_ON_DISK,
+ HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE,
+ HColumnDescriptor.DEFAULT_ENCODED_DATA_BLOCK_SEEK,
+ false, false,
+ HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
return htd;
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java Sat Dec 24 21:20:39 2011
@@ -191,7 +191,8 @@ public class HFilePerformanceEvaluation
void setUp() throws Exception {
writer =
HFile.getWriterFactoryNoCache(conf).createWriter(this.fs,
- this.mf, RFILE_BLOCKSIZE, (Compression.Algorithm) null, null);
+ this.mf, RFILE_BLOCKSIZE, (Compression.Algorithm) null, null,
+ null);
}
@Override
@@ -365,4 +366,4 @@ public class HFilePerformanceEvaluation
public static void main(String[] args) throws Exception {
new HFilePerformanceEvaluation().runBenchmarks();
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Sat Dec 24 21:20:39 2011
@@ -143,6 +143,9 @@ public class TestFromClientSide {
HColumnDescriptor.DEFAULT_VERSIONS,
true,
HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_ON_DISK,
+ HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE,
+ HColumnDescriptor.DEFAULT_ENCODED_DATA_BLOCK_SEEK,
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
HColumnDescriptor.DEFAULT_BLOCKSIZE,
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java Sat Dec 24 21:20:39 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
@@ -98,7 +99,7 @@ public class TestHalfStoreFileReader {
CacheConfig cacheConf)
throws IOException {
final HalfStoreFileReader halfreader =
- new HalfStoreFileReader(fs, p, cacheConf, bottom);
+ new HalfStoreFileReader(fs, p, cacheConf, bottom, null);
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java Sat Dec 24 21:20:39 2011
@@ -295,7 +295,14 @@ public class TestHeapSize extends TestCa
assertEquals(expected, actual);
}
+ // SchemaConfigured
+ LOG.debug("Heap size for: " + SchemaConfigured.class.getName());
+ SchemaConfigured sc = new SchemaConfigured(null, "myTable", "myCF");
+ assertEquals(ClassSize.estimateBase(SchemaConfigured.class, true),
+ sc.heapSize());
+
// Store Overhead
+ LOG.debug("Heap size for: " + Store.class.getName());
cl = Store.class;
actual = Store.FIXED_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
@@ -319,10 +326,6 @@ public class TestHeapSize extends TestCa
// accounted for. But we have satisfied our two core requirements.
// Sizing is quite accurate now, and our tests will throw errors if
// any of these classes are modified without updating overhead sizes.
-
- SchemaConfigured sc = new SchemaConfigured(null, "myTable", "myCF");
- assertEquals(ClassSize.estimateBase(SchemaConfigured.class, true),
- sc.heapSize());
}
@org.junit.Rule
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java Sat Dec 24 21:20:39 2011
@@ -41,9 +41,15 @@ import org.apache.hadoop.hbase.regionser
public class CacheTestUtils {
- /*Just checks if heapsize grows when something is cached, and gets smaller when the same object is evicted*/
+ private static final boolean includesMemstoreTS = true;
- public static void testHeapSizeChanges(final BlockCache toBeTested, final int blockSize){
+ /**
+ * Just checks if heapsize grows when something is cached, and gets smaller
+ * when the same object is evicted
+ */
+
+ public static void testHeapSizeChanges(final BlockCache toBeTested,
+ final int blockSize) {
HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
long heapSize = ((HeapSize) toBeTested).heapSize();
toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
@@ -316,7 +322,8 @@ public class CacheTestUtils {
HFileBlock generated = new HFileBlock(BlockType.DATA,
onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
- prevBlockOffset, cachedBuffer, false, blockSize);
+ prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
+ blockSize, includesMemstoreTS);
String strKey;
/* No conflicting keys */
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java Sat Dec 24 21:20:39 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.junit.After;
@@ -42,6 +43,7 @@ import org.junit.experimental.categories
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+
import static org.junit.Assert.*;
/**
@@ -61,10 +63,12 @@ public class TestCacheOnWrite {
private FileSystem fs;
private Random rand = new Random(12983177L);
private Path storeFilePath;
- private Compression.Algorithm compress;
- private CacheOnWriteType cowType;
private BlockCache blockCache;
- private String testName;
+ private String testDescription;
+
+ private final CacheOnWriteType cowType;
+ private final Compression.Algorithm compress;
+ private final BlockEncoderTestType encoderType;
private static final int DATA_BLOCK_SIZE = 2048;
private static final int NUM_KV = 25000;
@@ -76,49 +80,90 @@ public class TestCacheOnWrite {
KeyValue.Type.values().length - 2;
private static enum CacheOnWriteType {
- DATA_BLOCKS(BlockType.DATA, CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY),
- BLOOM_BLOCKS(BlockType.BLOOM_CHUNK,
- CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY),
- INDEX_BLOCKS(BlockType.LEAF_INDEX,
- CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY);
+ DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
+ BlockType.DATA, BlockType.ENCODED_DATA),
+ BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
+ BlockType.BLOOM_CHUNK),
+ INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
+ BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
private final String confKey;
- private final BlockType inlineBlockType;
+ private final BlockType blockType1;
+ private final BlockType blockType2;
+
+ private CacheOnWriteType(String confKey, BlockType blockType) {
+ this(confKey, blockType, blockType);
+ }
- private CacheOnWriteType(BlockType inlineBlockType, String confKey) {
- this.inlineBlockType = inlineBlockType;
+ private CacheOnWriteType(String confKey, BlockType blockType1,
+ BlockType blockType2) {
+ this.blockType1 = blockType1;
+ this.blockType2 = blockType2;
this.confKey = confKey;
}
public boolean shouldBeCached(BlockType blockType) {
- return blockType == inlineBlockType
- || blockType == BlockType.INTERMEDIATE_INDEX
- && inlineBlockType == BlockType.LEAF_INDEX;
+ return blockType == blockType1 || blockType == blockType2;
}
public void modifyConf(Configuration conf) {
- for (CacheOnWriteType cowType : CacheOnWriteType.values())
+ for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
conf.setBoolean(cowType.confKey, cowType == this);
+ }
}
}
+ private static final DataBlockEncodings.Algorithm ENCODING_ALGO =
+ DataBlockEncodings.Algorithm.PREFIX;
+
+ /** Provides fancy names for four combinations of two booleans */
+ private static enum BlockEncoderTestType {
+ NO_BLOCK_ENCODING(false, false),
+ BLOCK_ENCODING_IN_CACHE_ONLY(false, true),
+ BLOCK_ENCODING_ON_DISK_ONLY(true, false),
+ BLOCK_ENCODING_EVERYWHERE(true, true);
+
+ private final boolean encodeOnDisk;
+ private final boolean encodeInCache;
+
+ BlockEncoderTestType(boolean encodeOnDisk, boolean encodeInCache) {
+ this.encodeOnDisk = encodeOnDisk;
+ this.encodeInCache = encodeInCache;
+ }
+
+ public HFileDataBlockEncoder getEncoder() {
+ // We always use an encoded seeker. It should not have effect if there
+ // is no encoding in cache.
+ return new HFileDataBlockEncoderImpl(
+ encodeOnDisk ? ENCODING_ALGO : DataBlockEncodings.Algorithm.NONE,
+ encodeInCache ? ENCODING_ALGO : DataBlockEncodings.Algorithm.NONE,
+ true);
+ }
+ }
+
public TestCacheOnWrite(CacheOnWriteType cowType,
- Compression.Algorithm compress) {
+ Compression.Algorithm compress, BlockEncoderTestType encoderType) {
this.cowType = cowType;
this.compress = compress;
- testName = "[cacheOnWrite=" + cowType + ", compress=" + compress + "]";
- System.out.println(testName);
+ this.encoderType = encoderType;
+ testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
+ ", encoderType=" + encoderType + "]";
+ System.out.println(testDescription);
}
@Parameters
public static Collection<Object[]> getParameters() {
List<Object[]> cowTypes = new ArrayList<Object[]>();
- for (CacheOnWriteType cowType : CacheOnWriteType.values())
+ for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
for (Compression.Algorithm compress :
HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
- cowTypes.add(new Object[] { cowType, compress });
+ for (BlockEncoderTestType encoderType :
+ BlockEncoderTestType.values()) {
+ cowTypes.add(new Object[] { cowType, compress, encoderType });
+ }
}
+ }
return cowTypes;
}
@@ -156,10 +201,10 @@ public class TestCacheOnWrite {
private void readStoreFile() throws IOException {
HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs,
- storeFilePath, cacheConf);
+ storeFilePath, cacheConf, encoderType.getEncoder());
LOG.info("HFile information: " + reader);
HFileScanner scanner = reader.getScanner(false, false);
- assertTrue(testName, scanner.seekTo());
+ assertTrue(testDescription, scanner.seekTo());
long offset = 0;
HFileBlock prevBlock = null;
@@ -174,10 +219,11 @@ public class TestCacheOnWrite {
// Flags: don't cache the block, use pread, this is not a compaction.
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
false);
- BlockCacheKey blockCacheKey = HFile.getBlockCacheKey(reader.getName(), offset);
+ BlockCacheKey blockCacheKey = HFile.getBlockCacheKey(reader.getName(),
+ offset);
boolean isCached = blockCache.getBlock(blockCacheKey, true) != null;
boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
- assertEquals(testName + " " + block, shouldBeCached, isCached);
+ assertEquals(testDescription + " " + block, shouldBeCached, isCached);
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
BlockType bt = block.getBlockType();
@@ -187,8 +233,10 @@ public class TestCacheOnWrite {
LOG.info("Block count by type: " + blockCountByType);
String countByType = blockCountByType.toString();
- assertEquals(
- "{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
+ BlockType cachedDataBlockType =
+ encoderType.encodeInCache ? BlockType.ENCODED_DATA : BlockType.DATA;
+ assertEquals("{" + cachedDataBlockType
+ + "=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
countByType);
reader.close();
@@ -214,8 +262,9 @@ public class TestCacheOnWrite {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
"test_cache_on_write");
StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir,
- DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf,
- cacheConf, StoreFile.BloomType.ROWCOL, NUM_KV);
+ DATA_BLOCK_SIZE, compress, encoderType.getEncoder(),
+ KeyValue.COMPARATOR, conf, cacheConf, StoreFile.BloomType.ROWCOL,
+ NUM_KV);
final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) {
@@ -236,7 +285,6 @@ public class TestCacheOnWrite {
storeFilePath = sfw.getPath();
}
-
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java Sat Dec 24 21:20:39 2011
@@ -27,6 +27,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,16 +47,24 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.DoubleOutputStream;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.Compressor;
import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
@Category(MediumTests.class)
+@RunWith(Parameterized.class)
public class TestHFileBlock {
// change this value to activate more logs
private static final boolean detailedLogging = false;
@@ -69,14 +79,29 @@ public class TestHFileBlock {
static final Compression.Algorithm[] GZIP_ONLY = { GZ };
private static final int NUM_TEST_BLOCKS = 1000;
-
private static final int NUM_READER_THREADS = 26;
+ // Used to generate KeyValues
+ private static int NUM_KEYVALUES = 50;
+ private static int FIELD_LENGTH = 10;
+ private static float CHANCE_TO_REPEAT = 0.6f;
+
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private FileSystem fs;
private int uncompressedSizeV1;
+ private final boolean includesMemstoreTS;
+
+ public TestHFileBlock(boolean includesMemstoreTS) {
+ this.includesMemstoreTS = includesMemstoreTS;
+ }
+
+ @Parameters
+ public static Collection<Object[]> parameters() {
+ return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
+ }
+
@Before
public void setUp() throws IOException {
fs = FileSystem.get(TEST_UTIL.getConfiguration());
@@ -88,6 +113,74 @@ public class TestHFileBlock {
dos.writeInt(i / 100);
}
+ private int writeTestKeyValues(OutputStream dos, int seed)
+ throws IOException {
+ List<KeyValue> keyValues = new ArrayList<KeyValue>();
+ Random randomizer = new Random(42l + seed); // just any fixed number
+
+ // generate keyValues
+ for (int i = 0 ; i < NUM_KEYVALUES ; ++i) {
+ byte[] row;
+ long timestamp;
+ byte[] family;
+ byte[] qualifier;
+ byte[] value;
+
+ // generate it or repeat, it should compress well
+ if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+ row = keyValues.get(randomizer.nextInt(keyValues.size())).getRow();
+ } else {
+ row = new byte[FIELD_LENGTH];
+ randomizer.nextBytes(row);
+ }
+ if (0 == i) {
+ family = new byte[FIELD_LENGTH];
+ randomizer.nextBytes(family);
+ } else {
+ family = keyValues.get(0).getFamily();
+ }
+ if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+ qualifier = keyValues.get(
+ randomizer.nextInt(keyValues.size())).getQualifier();
+ } else {
+ qualifier = new byte[FIELD_LENGTH];
+ randomizer.nextBytes(qualifier);
+ }
+ if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+ value = keyValues.get(randomizer.nextInt(keyValues.size())).getValue();
+ } else {
+ value = new byte[FIELD_LENGTH];
+ randomizer.nextBytes(value);
+ }
+ if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+ timestamp = keyValues.get(
+ randomizer.nextInt(keyValues.size())).getTimestamp();
+ } else {
+ timestamp = randomizer.nextLong();
+ }
+
+ keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
+ }
+
+ // sort it and write to stream
+ int totalSize = 0;
+ Collections.sort(keyValues, KeyValue.COMPARATOR);
+ DataOutputStream dataOutputStream = new DataOutputStream(dos);
+ for (KeyValue kv : keyValues) {
+ totalSize += kv.getLength();
+ dataOutputStream.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
+ if (includesMemstoreTS) {
+ long memstoreTS = randomizer.nextLong();
+ WritableUtils.writeVLong(dataOutputStream, memstoreTS);
+ totalSize += WritableUtils.getVIntSize(memstoreTS);
+ }
+ }
+
+ return totalSize;
+ }
+
+
+
public byte[] createTestV1Block(Compression.Algorithm algo)
throws IOException {
Compressor compressor = algo.getCompressor();
@@ -105,8 +198,9 @@ public class TestHFileBlock {
private byte[] createTestV2Block(Compression.Algorithm algo)
throws IOException {
final BlockType blockType = BlockType.DATA;
- HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
- DataOutputStream dos = hbw.startWriting(blockType, false);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
+ includesMemstoreTS);
+ DataOutputStream dos = hbw.startWriting(blockType);
writeTestBlockContents(dos);
byte[] headerAndData = hbw.getHeaderAndData();
assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
@@ -194,10 +288,11 @@ public class TestHFileBlock {
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo);
FSDataOutputStream os = fs.create(path);
- HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
+ includesMemstoreTS);
long totalSize = 0;
for (int blockId = 0; blockId < 2; ++blockId) {
- DataOutputStream dos = hbw.startWriting(BlockType.DATA, false);
+ DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1234; ++i)
dos.writeInt(i);
hbw.writeHeaderAndData(os);
@@ -240,6 +335,97 @@ public class TestHFileBlock {
}
}
+ /**
+ * Test encoding/decoding data blocks.
+ * @throws IOException a bug or a problem with temporary files.
+ */
+ @Test
+ public void testDataBlockEncoding() throws IOException {
+ for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+ for (boolean pread : new boolean[] { false, true }) {
+ for (DataBlockEncodings.Algorithm dataBlockEncoderAlgo :
+ DataBlockEncodings.Algorithm.values()) {
+ Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ + algo + "_" + dataBlockEncoderAlgo.toString());
+ FSDataOutputStream os = fs.create(path);
+ HFileDataBlockEncoder dataBlockEncoder = new HFileDataBlockEncoderImpl(
+ dataBlockEncoderAlgo,
+ DataBlockEncodings.Algorithm.NONE,
+ HFileDataBlockEncoderImpl.NO_ENCODED_SEEK);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(algo, dataBlockEncoder,
+ includesMemstoreTS);
+ long totalSize = 0;
+ List<Integer> blockSizes = new ArrayList<Integer>();
+ List<ByteBuffer> blockContent = new ArrayList<ByteBuffer>();
+ for (int blockId = 0; blockId < 2; ++blockId) {
+ DataOutputStream dos = hbw.startWriting(BlockType.DATA);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DoubleOutputStream doubleOutputStream =
+ new DoubleOutputStream(dos, baos);
+
+ blockSizes.add(writeTestKeyValues(doubleOutputStream, blockId));
+
+ ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+ buf.rewind();
+ blockContent.add(buf);
+
+ hbw.writeHeaderAndData(os);
+ totalSize += hbw.getOnDiskSizeWithHeader();
+ }
+ os.close();
+
+ FSDataInputStream is = fs.open(path);
+ HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, algo,
+ totalSize, dataBlockEncoder);
+ hbr.setIncludesMemstoreTS(includesMemstoreTS);
+
+ HFileBlock b;
+ int pos = 0;
+ for (int blockId = 0; blockId < 2; ++blockId) {
+ b = hbr.readBlockData(pos, -1, -1, pread);
+ b.sanityCheck();
+ pos += b.getOnDiskSizeWithHeader();
+
+ assertEquals((int) blockSizes.get(blockId),
+ b.getUncompressedSizeWithoutHeader());
+ ByteBuffer actualBuffer = b.getBufferWithoutHeader();
+ ByteBuffer expectedBuffer = blockContent.get(blockId);
+ expectedBuffer.rewind();
+
+ // test if content matches, produce nice message
+ if (!actualBuffer.equals(expectedBuffer)) {
+ int prefix = 0;
+ while (prefix < expectedBuffer.limit() &&
+ expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
+ prefix++;
+ }
+
+ int kvCount = 0;
+
+ while (actualBuffer.position() + 2 * Bytes.SIZEOF_INT <
+ actualBuffer.limit()) {
+ int keyLength = actualBuffer.getInt();
+ int valueLength = actualBuffer.getInt();
+ kvCount++;
+ actualBuffer.position(actualBuffer.position() +
+ keyLength + valueLength);
+ }
+
+ fail(String.format(
+ "Content mismath compression: %s encoding: %s" +
+ " pread: %s commonPrefix: %d kvCount: %d" +
+ " expected char: %s actual char %s", algo.getName(),
+ dataBlockEncoderAlgo.toString(), pread, prefix, kvCount,
+ expectedBuffer.get(prefix),
+ actualBuffer.get(prefix)));
+ }
+ }
+ is.close();
+ }
+ }
+ }
+ }
+
@Test
public void testPreviousOffset() throws IOException {
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
@@ -446,13 +632,17 @@ public class TestHFileBlock {
) throws IOException {
boolean cacheOnWrite = expectedContents != null;
FSDataOutputStream os = fs.create(path);
- HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo, null,
+ includesMemstoreTS);
Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
long totalSize = 0;
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
+ if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
+ blockTypeOrdinal = BlockType.DATA.ordinal();
+ }
BlockType bt = BlockType.values()[blockTypeOrdinal];
- DataOutputStream dos = hbw.startWriting(bt, cacheOnWrite);
+ DataOutputStream dos = hbw.startWriting(bt);
for (int j = 0; j < rand.nextInt(500); ++j) {
// This might compress well.
dos.writeShort(i + 1);
@@ -501,7 +691,7 @@ public class TestHFileBlock {
byte[] byteArr = new byte[HFileBlock.HEADER_SIZE + size];
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
- true, -1);
+ HFileBlock.FILL_HEADER, -1, includesMemstoreTS);
long byteBufferExpectedSize =
ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
+ HFileBlock.HEADER_SIZE + size);
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java Sat Dec 24 21:20:39 2011
@@ -20,6 +20,10 @@
package org.apache.hadoop.hbase.io.hfile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -44,7 +48,6 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
-
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -52,8 +55,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import static org.junit.Assert.*;
-
@RunWith(Parameterized.class)
@Category(MediumTests.class)
public class TestHFileBlockIndex {
@@ -92,6 +93,8 @@ public class TestHFileBlockIndex {
private static final int[] UNCOMPRESSED_INDEX_SIZES =
{ 19187, 21813, 23086 };
+ private static final boolean includesMemstoreTS = true;
+
static {
assert INDEX_CHUNK_SIZES.length == EXPECTED_NUM_LEVELS.length;
assert INDEX_CHUNK_SIZES.length == UNCOMPRESSED_INDEX_SIZES.length;
@@ -210,13 +213,14 @@ public class TestHFileBlockIndex {
private void writeWholeIndex() throws IOException {
assertEquals(0, keys.size());
- HFileBlock.Writer hbw = new HFileBlock.Writer(compr);
+ HFileBlock.Writer hbw = new HFileBlock.Writer(compr, null,
+ includesMemstoreTS);
FSDataOutputStream outputStream = fs.create(path);
HFileBlockIndex.BlockIndexWriter biw =
new HFileBlockIndex.BlockIndexWriter(hbw, null, null);
for (int i = 0; i < NUM_DATA_BLOCKS; ++i) {
- hbw.startWriting(BlockType.DATA, false).write(
+ hbw.startWriting(BlockType.DATA).write(
String.valueOf(rand.nextInt(1000)).getBytes());
long blockOffset = outputStream.getPos();
hbw.writeHeaderAndData(outputStream);
@@ -251,7 +255,7 @@ public class TestHFileBlockIndex {
boolean isClosing) throws IOException {
while (biw.shouldWriteBlock(isClosing)) {
long offset = outputStream.getPos();
- biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType(), false));
+ biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType()));
hbw.writeHeaderAndData(outputStream);
biw.blockWritten(offset, hbw.getOnDiskSizeWithHeader(),
hbw.getUncompressedSizeWithoutHeader());
@@ -479,7 +483,7 @@ public class TestHFileBlockIndex {
{
HFile.Writer writer =
HFile.getWriterFactory(conf, cacheConf).createWriter(fs,
- hfilePath, SMALL_BLOCK_SIZE, compr, KeyValue.KEY_COMPARATOR);
+ hfilePath, SMALL_BLOCK_SIZE, compr, null, KeyValue.KEY_COMPARATOR);
Random rand = new Random(19231737);
for (int i = 0; i < NUM_KV; ++i) {