You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/12/24 22:20:41 UTC
svn commit: r1223020 [3/5] - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/
main/java/org/apache/hadoop/hbase/io/encoding/
main/java/org/apache/hadoop/hbase/io/hfile/
main/java/org/apache/hadoop/hbase/mapr...
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Sat Dec 24 21:20:39 2011
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
+import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
+
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -31,11 +34,12 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.hbase.io.DoubleOutputStream;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.CompoundBloomFilter;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.IOUtils;
@@ -45,9 +49,6 @@ import org.apache.hadoop.io.compress.Dec
import com.google.common.base.Preconditions;
-import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
-import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
-
/**
* Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
* <ul>
@@ -75,10 +76,26 @@ import static org.apache.hadoop.hbase.io
*/
public class HFileBlock extends SchemaConfigured implements Cacheable {
+ public static final boolean FILL_HEADER = true;
+ public static final boolean DONT_FILL_HEADER = false;
+
/** The size of a version 2 {@link HFile} block header */
public static final int HEADER_SIZE = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
+ Bytes.SIZEOF_LONG;
+ /**
+ * We store a two-byte encoder ID at the beginning of every encoded data
+ * block payload (immediately after the block header).
+ */
+ public static final int DATA_BLOCK_ENCODER_ID_SIZE = Bytes.SIZEOF_SHORT;
+
+ /**
+ * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
+ * This extends normal header by adding the id of encoder.
+ */
+ public static final int ENCODED_HEADER_SIZE = HEADER_SIZE
+ + DATA_BLOCK_ENCODER_ID_SIZE;
+
/** Just an array of bytes of the right size. */
public static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE];
@@ -107,10 +124,11 @@ public class HFileBlock extends SchemaCo
};
private BlockType blockType;
- private final int onDiskSizeWithoutHeader;
+ private int onDiskSizeWithoutHeader;
private final int uncompressedSizeWithoutHeader;
private final long prevBlockOffset;
private ByteBuffer buf;
+ private boolean includesMemstoreTS;
/**
* The offset of this block in the file. Populated by the reader for
@@ -146,7 +164,7 @@ public class HFileBlock extends SchemaCo
*/
public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf,
- boolean fillHeader, long offset) {
+ boolean fillHeader, long offset, boolean includesMemstoreTS) {
this.blockType = blockType;
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
@@ -155,6 +173,7 @@ public class HFileBlock extends SchemaCo
if (fillHeader)
overwriteHeader();
this.offset = offset;
+ this.includesMemstoreTS = includesMemstoreTS;
}
/**
@@ -177,6 +196,15 @@ public class HFileBlock extends SchemaCo
return blockType;
}
+ /** @return get data block encoding id that was used to encode this block */
+ public short getDataBlockEncodingId() {
+ if (blockType != BlockType.ENCODED_DATA) {
+ throw new IllegalArgumentException("Querying encoder ID of a block " +
+ "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
+ }
+ return buf.getShort(HEADER_SIZE);
+ }
+
/**
* @return the on-disk size of the block with header size included
*/
@@ -509,29 +537,30 @@ public class HFileBlock extends SchemaCo
/** Compression algorithm for all blocks this instance writes. */
private final Compression.Algorithm compressAlgo;
- /**
- * The stream we use to accumulate data in the on-disk format for each
- * block (i.e. compressed data, or uncompressed if using no compression).
- * We reset this stream at the end of each block and reuse it. The header
- * is written as the first {@link #HEADER_SIZE} bytes into this stream.
- */
- private ByteArrayOutputStream baosOnDisk;
+ /** Data block encoder used for data blocks */
+ private final HFileDataBlockEncoder dataBlockEncoder;
/**
- * The stream we use to accumulate uncompressed block data for
- * cache-on-write. Null when cache-on-write is turned off.
+ * The stream we use to accumulate data in uncompressed format for each
+ * block. We reset this stream at the end of each block and reuse it. The
+ * header is written as the first {@link #HEADER_SIZE} bytes into this
+ * stream.
*/
private ByteArrayOutputStream baosInMemory;
/** Compressor, which is also reused between consecutive blocks. */
private Compressor compressor;
- /** Current block type. Set in {@link #startWriting(BlockType)}. */
+ /**
+ * Current block type. Set in {@link #startWriting(BlockType)}. Could be
+ * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
+ * to {@link BlockType#ENCODED_DATA}.
+ */
private BlockType blockType;
/**
* A stream that we write uncompressed bytes to, which compresses them and
- * writes them to {@link #baosOnDisk}.
+ * writes them to {@link #baosInMemory}.
*/
private DataOutputStream userDataStream;
@@ -542,14 +571,8 @@ public class HFileBlock extends SchemaCo
private byte[] onDiskBytesWithHeader;
/**
- * The total number of uncompressed bytes written into the current block,
- * with header size not included. Valid in the READY state.
- */
- private int uncompressedSizeWithoutHeader;
-
- /**
- * Only used when we are using cache-on-write. Valid in the READY state.
- * Contains the header and the uncompressed bytes, so the length is
+ * Valid in the READY state. Contains the header and the uncompressed (but
+ * potentially encoded, if this is a data block) bytes, so the length is
* {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#HEADER_SIZE}.
*/
private byte[] uncompressedBytesWithHeader;
@@ -566,30 +589,36 @@ public class HFileBlock extends SchemaCo
*/
private long[] prevOffsetByType;
- /**
- * Whether we are accumulating uncompressed bytes for the purpose of
- * caching on write.
- */
- private boolean cacheOnWrite;
-
/** The offset of the previous block of the same type */
private long prevOffset;
+ /** Whether we are including memstore timestamp after every key/value */
+ private boolean includesMemstoreTS;
+
/**
- * @param compressionAlgorithm
- * compression algorithm to use
+ * Unencoded data block for caching on write. Populated before encoding.
*/
- public Writer(Compression.Algorithm compressionAlgorithm) {
- compressAlgo = compressionAlgorithm == null ? NONE
- : compressionAlgorithm;
+ private HFileBlock unencodedDataBlockForCaching;
- baosOnDisk = new ByteArrayOutputStream();
+ /**
+ * @param compressionAlgorithm compression algorithm to use
+ * @param dataBlockEncoderAlgo data block encoding algorithm to use
+ */
+ public Writer(Compression.Algorithm compressionAlgorithm,
+ HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS) {
+ compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
+ this.dataBlockEncoder = dataBlockEncoder != null
+ ? dataBlockEncoder : new NoOpDataBlockEncoder();
+
+ baosInMemory = new ByteArrayOutputStream();
if (compressAlgo != NONE)
compressor = compressionAlgorithm.getCompressor();
prevOffsetByType = new long[BlockType.values().length];
for (int i = 0; i < prevOffsetByType.length; ++i)
prevOffsetByType[i] = -1;
+
+ this.includesMemstoreTS = includesMemstoreTS;
}
/**
@@ -598,44 +627,26 @@ public class HFileBlock extends SchemaCo
* @return the stream the user can write their data into
* @throws IOException
*/
- public DataOutputStream startWriting(BlockType newBlockType,
- boolean cacheOnWrite) throws IOException {
+ public DataOutputStream startWriting(BlockType newBlockType)
+ throws IOException {
if (state == State.BLOCK_READY && startOffset != -1) {
// We had a previous block that was written to a stream at a specific
// offset. Save that offset as the last offset of a block of that type.
- prevOffsetByType[blockType.ordinal()] = startOffset;
+ prevOffsetByType[blockType.getId()] = startOffset;
}
- this.cacheOnWrite = cacheOnWrite;
-
startOffset = -1;
blockType = newBlockType;
- baosOnDisk.reset();
- baosOnDisk.write(DUMMY_HEADER);
+ baosInMemory.reset();
+ baosInMemory.write(DUMMY_HEADER);
state = State.WRITING;
- if (compressAlgo == NONE) {
- // We do not need a compression stream or a second uncompressed stream
- // for cache-on-write.
- userDataStream = new DataOutputStream(baosOnDisk);
- } else {
- OutputStream compressingOutputStream =
- compressAlgo.createCompressionStream(baosOnDisk, compressor, 0);
- if (cacheOnWrite) {
- // We save uncompressed data in a cache-on-write mode.
- if (baosInMemory == null)
- baosInMemory = new ByteArrayOutputStream();
- baosInMemory.reset();
- baosInMemory.write(DUMMY_HEADER);
- userDataStream = new DataOutputStream(new DoubleOutputStream(
- compressingOutputStream, baosInMemory));
- } else {
- userDataStream = new DataOutputStream(compressingOutputStream);
- }
- }
+ unencodedDataBlockForCaching = null;
+ // We will compress it later in finishBlock()
+ userDataStream = new DataOutputStream(baosInMemory);
return userDataStream;
}
@@ -662,45 +673,125 @@ public class HFileBlock extends SchemaCo
if (state == State.BLOCK_READY)
return;
+ // This will set state to BLOCK_READY.
finishBlock();
- state = State.BLOCK_READY;
}
/**
* An internal method that flushes the compressing stream (if using
* compression), serializes the header, and takes care of the separate
- * uncompressed stream for caching on write, if applicable. Block writer
- * state transitions must be managed by the caller.
+ * uncompressed stream for caching on write, if applicable. Sets block
+ * write state to "block ready".
*/
private void finishBlock() throws IOException {
userDataStream.flush();
- uncompressedSizeWithoutHeader = userDataStream.size();
- onDiskBytesWithHeader = baosOnDisk.toByteArray();
- prevOffset = prevOffsetByType[blockType.ordinal()];
- putHeader(onDiskBytesWithHeader, 0);
+ // This does an array copy, so it is safe to cache this byte array.
+ uncompressedBytesWithHeader = baosInMemory.toByteArray();
+ prevOffset = prevOffsetByType[blockType.getId()];
+
+ // We need to set state before we can package the block up for
+ // cache-on-write. In a way, the block is ready, but not yet encoded or
+ // compressed.
+ state = State.BLOCK_READY;
+ encodeDataBlockForDisk();
- if (cacheOnWrite && compressAlgo != NONE) {
- uncompressedBytesWithHeader = baosInMemory.toByteArray();
+ doCompression();
+ putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length,
+ uncompressedBytesWithHeader.length);
+
+ if (unencodedDataBlockForCaching != null) {
+ // We now know the final on-disk size, save it for caching.
+ unencodedDataBlockForCaching.onDiskSizeWithoutHeader =
+ getOnDiskSizeWithoutHeader();
+ unencodedDataBlockForCaching.overwriteHeader();
+ }
+ }
+
+ /**
+ * Do compression if it is enabled, or re-use the uncompressed buffer if
+ * it is not. Fills in the compressed block's header if doing compression.
+ */
+ private void doCompression() throws IOException {
+ // do the compression
+ if (compressAlgo != NONE) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write(DUMMY_HEADER);
+
+ // compress the data
+ OutputStream compressingOutputStream =
+ compressAlgo.createCompressionStream(baos, compressor, 0);
+ compressingOutputStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
+ uncompressedBytesWithHeader.length - HEADER_SIZE);
+
+ // finish compression stream
+ compressingOutputStream.flush();
+
+ onDiskBytesWithHeader = baos.toByteArray();
+ putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
+ uncompressedBytesWithHeader.length);
+ } else {
+ onDiskBytesWithHeader = uncompressedBytesWithHeader;
+ }
+ }
+
+ /**
+ * Encodes this block if it is a data block and encoding is turned on in
+ * {@link #dataBlockEncoder}.
+ */
+ private void encodeDataBlockForDisk() throws IOException {
+ if (blockType != BlockType.DATA) {
+ return; // skip any non-data block
+ }
- if (uncompressedSizeWithoutHeader !=
+ // do data block encoding, if data block encoder is set
+ ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader,
+ HEADER_SIZE, uncompressedBytesWithHeader.length -
+ HEADER_SIZE).slice();
+ Pair<ByteBuffer, BlockType> encodingResult =
+ dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
+ includesMemstoreTS);
+
+ BlockType encodedBlockType = encodingResult.getSecond();
+ if (encodedBlockType == BlockType.ENCODED_DATA) {
+ // Save the unencoded block in case we need to cache it on write.
+ // We don't know the final on-disk size at this point, because
+ // compression has not been done yet, to set it to uncompressed size
+ // and override later.
+ int uncompressedSizeWithoutHeader = getUncompressedSizeWithoutHeader();
+ unencodedDataBlockForCaching = new HFileBlock(blockType,
+ uncompressedSizeWithoutHeader, // will override this later
+ uncompressedSizeWithoutHeader, prevOffset,
+ getUncompressedBufferWithHeader(), FILL_HEADER, startOffset,
+ includesMemstoreTS);
+ uncompressedBytesWithHeader = encodingResult.getFirst().array();
+ blockType = encodedBlockType;
+ } else {
+ // There is no encoding configured. Do some extra sanity-checking.
+ if (encodedBlockType != BlockType.DATA) {
+ throw new IOException("Unexpected block type coming out of data " +
+ "block encoder: " + encodedBlockType);
+ }
+ if (userDataStream.size() !=
uncompressedBytesWithHeader.length - HEADER_SIZE) {
throw new IOException("Uncompressed size mismatch: "
- + uncompressedSizeWithoutHeader + " vs. "
+ + userDataStream.size() + " vs. "
+ (uncompressedBytesWithHeader.length - HEADER_SIZE));
}
-
- // Write the header into the beginning of the uncompressed byte array.
- putHeader(uncompressedBytesWithHeader, 0);
}
}
- /** Put the header into the given byte array at the given offset. */
- private void putHeader(byte[] dest, int offset) {
+ /**
+ * Put the header into the given byte array at the given offset.
+ * @param onDiskSize size of the block on disk
+ * @param uncompressedSize size of the block after decompression (but
+ * before optional data block decoding)
+ */
+ private void putHeader(byte[] dest, int offset, int onDiskSize,
+ int uncompressedSize) {
offset = blockType.put(dest, offset);
- offset = Bytes.putInt(dest, offset, onDiskBytesWithHeader.length
- - HEADER_SIZE);
- offset = Bytes.putInt(dest, offset, uncompressedSizeWithoutHeader);
+ offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
+ offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
Bytes.putLong(dest, offset, prevOffset);
}
@@ -793,7 +884,7 @@ public class HFileBlock extends SchemaCo
*/
public int getUncompressedSizeWithoutHeader() {
expectState(State.BLOCK_READY);
- return uncompressedSizeWithoutHeader;
+ return uncompressedBytesWithHeader.length - HEADER_SIZE;
}
/**
@@ -801,7 +892,7 @@ public class HFileBlock extends SchemaCo
*/
public int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
- return uncompressedSizeWithoutHeader + HEADER_SIZE;
+ return uncompressedBytesWithHeader.length;
}
/** @return true if a block is being written */
@@ -832,15 +923,6 @@ public class HFileBlock extends SchemaCo
private byte[] getUncompressedDataWithHeader() {
expectState(State.BLOCK_READY);
- if (compressAlgo == NONE)
- return onDiskBytesWithHeader;
-
- if (!cacheOnWrite)
- throw new IllegalStateException("Cache-on-write is turned off");
-
- if (uncompressedBytesWithHeader == null)
- throw new NullPointerException();
-
return uncompressedBytesWithHeader;
}
@@ -874,14 +956,18 @@ public class HFileBlock extends SchemaCo
*/
public void writeBlock(BlockWritable bw, FSDataOutputStream out)
throws IOException {
- bw.writeToBlock(startWriting(bw.getBlockType(), false));
+ bw.writeToBlock(startWriting(bw.getBlockType()));
writeHeaderAndData(out);
}
public HFileBlock getBlockForCaching() {
- return new HFileBlock(blockType, onDiskBytesWithHeader.length
- - HEADER_SIZE, uncompressedSizeWithoutHeader, prevOffset,
- getUncompressedBufferWithHeader(), false, startOffset);
+ if (unencodedDataBlockForCaching != null) {
+ return unencodedDataBlockForCaching;
+ }
+ return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
+ getUncompressedSizeWithoutHeader(), prevOffset,
+ getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
+ includesMemstoreTS);
}
}
@@ -963,14 +1049,18 @@ public class HFileBlock extends SchemaCo
/** The size of the file we are reading from, or -1 if unknown. */
protected long fileSize;
+ /** Data block encoding used to read from file */
+ protected HFileDataBlockEncoder dataBlockEncoder;
+
/** The default buffer size for our buffered streams */
public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
public AbstractFSReader(FSDataInputStream istream, Algorithm compressAlgo,
- long fileSize) {
+ long fileSize, HFileDataBlockEncoder dataBlockEncoder) {
this.istream = istream;
this.compressAlgo = compressAlgo;
this.fileSize = fileSize;
+ this.dataBlockEncoder = dataBlockEncoder;
}
@Override
@@ -1133,7 +1223,12 @@ public class HFileBlock extends SchemaCo
public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
long fileSize) {
- super(istream, compressAlgo, fileSize);
+ this(istream, compressAlgo, fileSize, new NoOpDataBlockEncoder());
+ }
+
+ public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
+ long fileSize, HFileDataBlockEncoder blockEncoder) {
+ super(istream, compressAlgo, fileSize, blockEncoder);
}
/**
@@ -1156,7 +1251,8 @@ public class HFileBlock extends SchemaCo
*/
@Override
public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic,
- int uncompressedSizeWithMagic, boolean pread) throws IOException {
+ int uncompressedSizeWithMagic, boolean pread)
+ throws IOException {
if (uncompressedSizeWithMagic <= 0) {
throw new IOException("Invalid uncompressedSize="
+ uncompressedSizeWithMagic + " for a version 1 block");
@@ -1214,7 +1310,8 @@ public class HFileBlock extends SchemaCo
// to the size of the data portion of the block without the magic record,
// since the magic record gets moved to the header.
HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
- uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, true, offset);
+ uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER,
+ offset, MemStore.NO_PERSISTENT_TS);
return b;
}
}
@@ -1232,6 +1329,9 @@ public class HFileBlock extends SchemaCo
/** Reads version 2 blocks from the filesystem. */
public static class FSReaderV2 extends AbstractFSReader {
+ /** Whether we include memstore timestamp in data blocks */
+ protected boolean includesMemstoreTS;
+
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
new ThreadLocal<PrefetchedHeader>() {
@Override
@@ -1242,7 +1342,12 @@ public class HFileBlock extends SchemaCo
public FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
long fileSize) {
- super(istream, compressAlgo, fileSize);
+ this(istream, compressAlgo, fileSize, new NoOpDataBlockEncoder());
+ }
+
+ public FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
+ long fileSize, HFileDataBlockEncoder dataBlockEncoder) {
+ super(istream, compressAlgo, fileSize, dataBlockEncoder);
}
/**
@@ -1438,6 +1543,13 @@ public class HFileBlock extends SchemaCo
}
}
}
+
+ b.includesMemstoreTS = includesMemstoreTS;
+
+ if (b.getBlockType() == BlockType.ENCODED_DATA) {
+ b = dataBlockEncoder.afterReadFromDisk(b);
+ }
+
b.offset = offset;
return b;
}
@@ -1451,6 +1563,10 @@ public class HFileBlock extends SchemaCo
prefetchedHeader.header, 0, HEADER_SIZE);
}
+ void setIncludesMemstoreTS(boolean enabled) {
+ includesMemstoreTS = enabled;
+ }
+
}
@Override
@@ -1518,5 +1634,9 @@ public class HFileBlock extends SchemaCo
return true;
}
+ public boolean doesIncludeMemstoreTS() {
+ return includesMemstoreTS;
+ }
+
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java Sat Dec 24 21:20:39 2011
@@ -210,7 +210,8 @@ public class HFileBlockIndex {
}
// Found a data block, break the loop and check our level in the tree.
- if (block.getBlockType().equals(BlockType.DATA)) {
+ if (block.getBlockType().equals(BlockType.DATA) ||
+ block.getBlockType().equals(BlockType.ENCODED_DATA)) {
break;
}
@@ -733,8 +734,8 @@ public class HFileBlockIndex {
long rootLevelIndexPos = out.getPos();
{
- DataOutput blockStream = blockWriter.startWriting(BlockType.ROOT_INDEX,
- false);
+ DataOutput blockStream =
+ blockWriter.startWriting(BlockType.ROOT_INDEX);
rootChunk.writeRoot(blockStream);
if (midKeyMetadata != null)
blockStream.write(midKeyMetadata);
@@ -829,7 +830,7 @@ public class HFileBlockIndex {
BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
long beginOffset = out.getPos();
DataOutputStream dos = blockWriter.startWriting(
- BlockType.INTERMEDIATE_INDEX, cacheOnWrite());
+ BlockType.INTERMEDIATE_INDEX);
curChunk.writeNonRoot(dos);
byte[] curFirstKey = curChunk.getBlockKey(0);
blockWriter.writeHeaderAndData(out);
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Controls what kind of data block encoding is used. If data block encoding is
+ * not set, methods should just return unmodified block. All of the methods do
+ * something meaningful if BlockType is DATA_BLOCK or ENCODED_DATA. Otherwise
+ * they just return the unmodified block.
+ * <p>
+ * Read path: [parsed from disk] -> {@link #afterReadFromDisk(HFileBlock)} ->
+ * [caching] ->
+ * {@link #afterReadFromDiskAndPuttingInCache(HFileBlock, boolean)} -> [used
+ * somewhere]
+ * <p>
+ * where [caching] looks:
+ * <pre>
+ * ------------------------------------>
+ * \----> {@link #beforeBlockCache(HFileBlock)}
+ * </pre>
+ * <p>
+ * Write path: [sorted KeyValues have been created] ->
+ * {@link #beforeWriteToDisk(ByteBuffer)} -> [(optional) compress] -> [write to
+ * disk]
+ * <p>
+ * Reading from cache path: [get from cache] ->
+ * {@link #afterBlockCache(HFileBlock, boolean)}
+ * <p>
+ * Storing data in file info: {@link #saveMetadata(StoreFile.Writer)}
+ * <p>
+ * Creating algorithm specific Scanner: {@link #useEncodedScanner()}
+ */
+public interface HFileDataBlockEncoder {
+ /**
+ * Should be called after each HFileBlock of type DATA_BLOCK or
+ * ENCODED_DATA_BLOCK is read from disk, but before it is put into the cache.
+ * @param block Block read from HFile stored on disk.
+ * @return non null block which is coded according to the settings.
+ */
+ public HFileBlock afterReadFromDisk(HFileBlock block);
+
+ /**
+ * Should be called after each HFileBlock of type DATA_BLOCK or
+ * ENCODED_DATA_BLOCK is read from disk and after it is saved in cache
+ * @param block Block read from HFile stored on disk.
+ * @param isCompaction Will block be used for compaction.
+ * @return non null block which is coded according to the settings.
+ */
+ public HFileBlock afterReadFromDiskAndPuttingInCache(HFileBlock block,
+ boolean isCompaction, boolean includesMemsoreTS);
+
+ /**
+ * Should be called before an encoded or unencoded data block is written to
+ * disk.
+ * @param in KeyValues next to each other
+ * @return a non-null on-heap buffer containing the contents of the
+ * HFileBlock with unfilled header and block type
+ */
+ public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
+ ByteBuffer in, boolean includesMemstoreTS);
+
+ /**
+ * Should always be called before putting a block into cache.
+ * @param block block that needs to be put into cache.
+ * @return the block to put into cache instead (possibly the same)
+ */
+ public HFileBlock beforeBlockCache(HFileBlock block,
+ boolean includesMemstoreTS);
+
+ /**
+ * After getting block from cache.
+ * @param block block which was returned from cache, may be null.
+ * @param isCompaction Will block be used for compaction.
+ * @param includesMemstoreTS whether we have a memstore timestamp encoded
+ * as a variable-length integer after each key-value pair
+ * @return HFileBlock to use. Can be null, even if argument is not null.
+ */
+ public HFileBlock afterBlockCache(HFileBlock block,
+ boolean isCompaction, boolean includesMemstoreTS);
+
+ /**
+ * Should special version of scanner be used.
+ * @param isCompaction Will scanner be used for compaction.
+ * @return Whether to use encoded scanner.
+ */
+ public boolean useEncodedScanner(boolean isCompaction);
+
+ /**
+ * Save metadata in StoreFile which will be written to disk
+ * @param storeFileWriter writer for a given StoreFile
+ * @exception IOException on disk problems
+ */
+ public void saveMetadata(StoreFile.Writer storeFileWriter) throws IOException;
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings.Algorithm;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+
+/**
+ * Do different kinds of data block encoding according to column family
+ * options.
+ */
+public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
+ private final DataBlockEncodings.Algorithm onDisk;
+ private final DataBlockEncodings.Algorithm inCache;
+
+ public static final boolean NO_ENCODED_SEEK = false;
+
+ private final boolean encodedSeek;
+
+ /**
+ * Do data block encoding as with specified options.
+ * @param onDisk What kind of data block encoding will be used before writing
+ * HFileBlock to disk.
+ * @param inCache What kind of data block encoding will be used in block
+ * cache.
+ * @param encodedSeek should we seek over encoded data blocks (true) or
+ * decode blocks first and use normal seek operations (false)
+ */
+ public HFileDataBlockEncoderImpl(Algorithm onDisk, Algorithm inCache,
+ boolean encodedSeek) {
+ this.onDisk = onDisk != null ?
+ onDisk : DataBlockEncodings.Algorithm.NONE;
+ this.inCache = inCache != null ?
+ inCache : DataBlockEncodings.Algorithm.NONE;
+ this.encodedSeek = encodedSeek;
+ }
+
+ /**
+ * @return the data block encoding algorithm used on disk
+ */
+ public DataBlockEncodings.Algorithm getOnDisk() {
+ return onDisk;
+ }
+
+ /**
+ * @return the data block encoding algorithm used in the block cache
+ */
+ public DataBlockEncodings.Algorithm getInCache() {
+ return inCache;
+ }
+
+ /**
+ * @return whether we should do seek operations on encoded blocks
+ */
+ public boolean useEncodedSeek() {
+ return encodedSeek
+ && inCache != DataBlockEncodings.Algorithm.NONE;
+ }
+
+ // Preconditions: any HFileBlock format
+ // Postconditions: HFileBlock not encoded
+ // or encoded same format as inCache
+ @Override
+ public HFileBlock afterReadFromDisk(HFileBlock block) {
+ if (ignoreBlock(block)) {
+ return block; // non DATA block, skip it
+ }
+
+ // is already encoded in desired encoding
+ if (block.getBlockType() == BlockType.ENCODED_DATA &&
+ block.getDataBlockEncodingId() == inCache.getId()) {
+ return block;
+ }
+
+ // decode if we need it
+ HFileBlock decompressedBlock;
+ if (block.getBlockType() == BlockType.ENCODED_DATA) {
+ decompressedBlock = decodeDataBlock(block, false, (short) 0,
+ block.doesIncludeMemstoreTS());
+ } else {
+ decompressedBlock = block;
+ }
+
+ // check if we want to encode it here
+ if (encodedSeek && inCache != DataBlockEncodings.Algorithm.NONE &&
+ onDisk != DataBlockEncodings.Algorithm.NONE) {
+ return encodeDataBlock(decompressedBlock, inCache,
+ block.doesIncludeMemstoreTS());
+ }
+
+ return decompressedBlock;
+ }
+
+ /**
+ * Preconditions: HFileBlock not encoded or encoded in the {@link #inCache}
+ * format.
+ * <p>
+ * Postconditions:
+ * <ul>
+ * <li>if isCompaction is set and {@link #onDisk} is NONE there is no
+ * encoding</li>
+ * <li>if {@link #encodedSeek} is set there is same encoding as inCache
+ * Otherwise there is no encoding</li>
+ * </ul>
+ */
+ @Override
+ public HFileBlock afterReadFromDiskAndPuttingInCache(HFileBlock block,
+ boolean isCompaction, boolean includesMemstoreTS) {
+ if (ignoreBlock(block)) {
+ return block; // non DATA block, skip it
+ }
+
+ // use decoded buffer in case of compaction
+ if (dontEncodeBeforeCompaction(isCompaction)) {
+ if (block.getBlockType() != BlockType.DATA) {
+ return decodeDataBlock(block, true, inCache.getId(),
+ includesMemstoreTS);
+ }
+ return block;
+ }
+
+ if (!encodedSeek) {
+ // we need to have it decoded in memory
+ if (block.getBlockType() != BlockType.DATA) {
+ return decodeDataBlock(block, true, inCache.getId(),
+ includesMemstoreTS);
+ }
+ return block;
+ }
+
+ // got already data in desired format
+ if (block.getBlockType() == BlockType.ENCODED_DATA &&
+ block.getDataBlockEncodingId() == inCache.getId()) {
+ return block;
+ }
+
+ if (block.getBlockType() == BlockType.ENCODED_DATA) {
+ throw new IllegalStateException("Unexpected encoding");
+ }
+
+ // need to encode it
+ if (inCache != DataBlockEncodings.Algorithm.NONE) {
+ return encodeDataBlock(block, inCache, includesMemstoreTS);
+ }
+
+ return block;
+ }
+
+ // Precondition: not encoded buffer
+ // Postcondition: same encoding as onDisk
+ @Override
+ public Pair<ByteBuffer, BlockType> beforeWriteToDisk(ByteBuffer in,
+ boolean includesMemstoreTS) {
+ if (onDisk == DataBlockEncodings.Algorithm.NONE) {
+ // there is no need to encode the block before writing it to disk
+ return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
+ }
+
+ ByteBuffer encodedBuffer = encodeBufferToHFileBlockBuffer(in,
+ onDisk, includesMemstoreTS);
+ return new Pair<ByteBuffer, BlockType>(encodedBuffer,
+ BlockType.ENCODED_DATA);
+ }
+
+ // Precondition: an unencoded block or the same encoding as inCache
+ // Postcondition: same encoding as inCache
+ @Override
+ public HFileBlock beforeBlockCache(HFileBlock block,
+ boolean includesMemstoreTS) {
+ if (ignoreBlock(block)) {
+ return block; // non DATA block skip it
+ }
+
+ if (block.getBlockType() == BlockType.ENCODED_DATA) {
+ if (block.getDataBlockEncodingId() == inCache.getId()) {
+ // is already encoded in right format
+ return block;
+ }
+
+ // expecting either the "in-cache" encoding or no encoding
+ throw new IllegalStateException(String.format(
+ "Expected the in-cache encoding ('%s') or no encoding, " +
+ "but got encoding '%s'", inCache.toString(),
+ DataBlockEncodings.getNameFromId(
+ block.getDataBlockEncodingId())));
+ }
+
+ if (inCache != DataBlockEncodings.Algorithm.NONE) {
+ // encode data
+ HFileBlock encodedBlock = encodeDataBlock(block, inCache,
+ includesMemstoreTS);
+ block.passSchemaMetricsTo(encodedBlock);
+ return encodedBlock;
+ }
+
+ return block;
+ }
+
+ /**
+ * Precondition: same encoding as in inCache
+ * <p>
+ * Postcondition: if (isCompaction is set and {@link #onDisk} is not NONE) or
+ * {@link #encodedSeek} is not set -> don't encode.
+ */
+ @Override
+ public HFileBlock afterBlockCache(HFileBlock block, boolean isCompaction,
+ boolean includesMemstoreTS) {
+ if (block == null || ignoreBlock(block)) {
+ return block; // skip no DATA block
+ }
+
+ if (inCache == DataBlockEncodings.Algorithm.NONE) {
+ // no need of decoding
+ if (block.getBlockType() == BlockType.ENCODED_DATA) {
+ throw new IllegalStateException("Expected non-encoded data in cache.");
+ }
+ return block;
+ }
+
+ if (block.getBlockType() != BlockType.ENCODED_DATA) {
+ throw new IllegalStateException("Expected encoded data in cache.");
+ }
+
+ if (dontEncodeBeforeCompaction(isCompaction)) {
+ // If we don't use dataBlockEncoding on disk,
+ // we would also avoid using it for compactions.
+ // That way we don't change disk format.
+ return null;
+ }
+
+ if (encodedSeek) {
+ // we use encoding in memory
+ return block;
+ }
+
+ return decodeDataBlock(block, true, inCache.getId(), includesMemstoreTS);
+ }
+
+ @Override
+ public boolean useEncodedScanner(boolean isCompaction) {
+ if (isCompaction && onDisk == DataBlockEncodings.Algorithm.NONE) {
+ return false;
+ }
+ return encodedSeek && inCache != DataBlockEncodings.Algorithm.NONE;
+ }
+
+ @Override
+ public void saveMetadata(StoreFile.Writer storeFileWriter)
+ throws IOException {
+ storeFileWriter.appendFileInfo(StoreFile.DATA_BLOCK_ENCODING,
+ onDisk.getNameInBytes());
+ }
+
+ private HFileBlock decodeDataBlock(HFileBlock block, boolean verifyEncoding,
+ short expectedEncoderId, boolean includesMemstoreTS) {
+ assert block.getBlockType() == BlockType.ENCODED_DATA;
+ short dataBlockEncoderId = block.getDataBlockEncodingId();
+
+ // (optional) sanity check of encoder type
+ if (verifyEncoding && expectedEncoderId != dataBlockEncoderId) {
+ throw new IllegalStateException(String.format(
+ "Expected encoding type '%d', but found '%d'",
+ expectedEncoderId, dataBlockEncoderId));
+ }
+
+ ByteBuffer originalBuf = block.getBufferReadOnly();
+ ByteBuffer withoutEncodedHeader = ByteBuffer.wrap(originalBuf.array(),
+ originalBuf.arrayOffset() + HFileBlock.ENCODED_HEADER_SIZE,
+ originalBuf.limit() - HFileBlock.ENCODED_HEADER_SIZE).slice();
+ ByteBufferInputStream bbis =
+ new ByteBufferInputStream(withoutEncodedHeader);
+ DataInputStream dis;
+ ByteBuffer newBuf;
+ DataBlockEncoder dataBlockEncoder = null;
+
+ try {
+ dis = new DataInputStream(bbis);
+ dataBlockEncoder =
+ DataBlockEncodings.getDataBlockEncoderFromId(dataBlockEncoderId);
+ int preReadLength = originalBuf.limit() -
+ HFileBlock.HEADER_SIZE - block.getUncompressedSizeWithoutHeader();
+ // Sometimes buffer is larger, because it also contains next's block
+ // header. In that case we want to skip it.
+ newBuf = dataBlockEncoder.uncompressKeyValues(dis, HFileBlock.HEADER_SIZE,
+ preReadLength, includesMemstoreTS);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Bug while decoding the block using '%s'", dataBlockEncoder), e);
+ }
+
+ // Create a decoded HFileBlock. Offset will be set later.
+ return new HFileBlock(BlockType.DATA, block.getOnDiskSizeWithoutHeader(),
+ newBuf.limit() - HFileBlock.HEADER_SIZE, block.getPrevBlockOffset(),
+ newBuf, HFileBlock.FILL_HEADER, 0, includesMemstoreTS);
+ }
+
+ private ByteBuffer encodeBufferToHFileBlockBuffer(ByteBuffer in,
+ DataBlockEncodings.Algorithm algo, boolean includesMemstoreTS) {
+ ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(encodedStream);
+ DataBlockEncoder encoder = algo.getEncoder();
+ try {
+ encodedStream.write(HFileBlock.DUMMY_HEADER);
+ algo.writeIdInBytes(dataOut);
+ encoder.compressKeyValues(dataOut, in,
+ includesMemstoreTS);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("Bug in data block encoder " +
+ "'%s', it probably requested too much data", algo.toString()), e);
+ }
+ return ByteBuffer.wrap(encodedStream.toByteArray());
+ }
+
+ private HFileBlock encodeDataBlock(HFileBlock block,
+ DataBlockEncodings.Algorithm algo, boolean includesMemstoreTS) {
+ ByteBuffer compressedBuffer = encodeBufferToHFileBlockBuffer(
+ block.getBufferWithoutHeader(), algo, includesMemstoreTS);
+ int sizeWithoutHeader = compressedBuffer.limit() - HFileBlock.HEADER_SIZE;
+ return new HFileBlock(BlockType.ENCODED_DATA,
+ block.getOnDiskSizeWithoutHeader(),
+ sizeWithoutHeader, block.getPrevBlockOffset(),
+ compressedBuffer, HFileBlock.FILL_HEADER, block.getOffset(),
+ includesMemstoreTS);
+ }
+
+ private boolean ignoreBlock(HFileBlock block) {
+ BlockType type = block.getBlockType();
+ return type != BlockType.DATA && type != BlockType.ENCODED_DATA;
+ }
+
+ private boolean dontEncodeBeforeCompaction(boolean isCompaction) {
+ return isCompaction
+ && onDisk == DataBlockEncodings.Algorithm.NONE;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(getClass().getSimpleName()
+ + " onDisk='%s' inCache='%s' encodedSeek=%s", onDisk.toString(),
+ inCache.toString(), encodedSeek);
+ }
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java Sat Dec 24 21:20:39 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.ByteBloomFilter;
@@ -170,6 +171,7 @@ public class HFilePrettyPrinter {
conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
conf.set("fs.default.name",
conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
+ SchemaMetrics.configureGlobally(conf);
try {
if (!parseOptions(args))
return 1;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java Sat Dec 24 21:20:39 2011
@@ -30,10 +30,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
@@ -57,12 +59,16 @@ public class HFileReaderV1 extends Abstr
* stream.
* @param size Length of the stream.
* @param cacheConf cache references and configuration
+ * @param blockEncoder what kind of data block encoding will be used
+ * @throws IOException
*/
public HFileReaderV1(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long size,
final boolean closeIStream,
- final CacheConfig cacheConf) {
- super(path, trailer, fsdis, size, closeIStream, cacheConf);
+ final CacheConfig cacheConf,
+ final HFileDataBlockEncoder blockEncoder) {
+ super(path, trailer, fsdis, size, closeIStream, cacheConf,
+ blockEncoder);
trailer.expectVersion(1);
fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize);
@@ -167,6 +173,9 @@ public class HFileReaderV1 extends Abstr
@Override
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
+ if (blockEncoder.useEncodedScanner(isCompaction)) {
+ return new EncodedScannerV1(this, cacheBlocks, pread, isCompaction);
+ }
return new ScannerV1(this, cacheBlocks, pread, isCompaction);
}
@@ -295,6 +304,8 @@ public class HFileReaderV1 extends Abstr
HFileBlock cachedBlock =
(HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
cacheConf.shouldCacheDataOnRead());
+ cachedBlock = blockEncoder.afterBlockCache(cachedBlock,
+ isCompaction, MemStore.NO_PERSISTENT_TS);
if (cachedBlock != null) {
cacheHits.incrementAndGet();
getSchemaMetrics().updateOnCacheHit(cachedBlock.getBlockType().getCategory(),
@@ -322,7 +333,7 @@ public class HFileReaderV1 extends Abstr
- offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
passSchemaMetricsTo(hfileBlock);
hfileBlock.expectType(BlockType.DATA);
- ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
+ hfileBlock = blockEncoder.afterReadFromDisk(hfileBlock);
long delta = System.nanoTime() - startTimeNs;
if (pread) {
@@ -338,10 +349,17 @@ public class HFileReaderV1 extends Abstr
// Cache the block
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
hfileBlock.getBlockType().getCategory())) {
+ hfileBlock = blockEncoder.beforeBlockCache(hfileBlock,
+ MemStore.NO_PERSISTENT_TS);
cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
cacheConf.isInMemory());
}
+ hfileBlock = blockEncoder.afterReadFromDiskAndPuttingInCache(
+ hfileBlock, isCompaction, MemStore.NO_PERSISTENT_TS);
+
+ ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
+
return buf;
}
}
@@ -396,16 +414,101 @@ public class HFileReaderV1 extends Abstr
}
}
+ protected abstract static class AbstractScannerV1
+ extends AbstractHFileReader.Scanner {
+ protected final HFileReaderV1 readerV1;
+ protected int currBlock;
+
+ public AbstractScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
+ final boolean pread, final boolean isCompaction) {
+ super(reader, cacheBlocks, pread, isCompaction);
+ readerV1 = reader;
+ }
+
+ /**
+ * Within a loaded block, seek looking for the first key
+ * that is smaller than (or equal to?) the key we are interested in.
+ *
+ * A note on the seekBefore - if you have seekBefore = true, AND the
+ * first key in the block = key, then you'll get thrown exceptions.
+ * @param key to find
+ * @param seekBefore find the key before the exact match.
+ * @return
+ */
+ protected abstract int blockSeek(byte[] key, int offset, int length,
+ boolean seekBefore);
+
+ protected abstract void loadBlock(int bloc, boolean rewind)
+ throws IOException;
+
+ @Override
+ public int seekTo(byte[] key, int offset, int length) throws IOException {
+ int b = readerV1.blockContainingKey(key, offset, length);
+ if (b < 0) return -1; // falls before the beginning of the file! :-(
+ // Avoid re-reading the same block (that'd be dumb).
+ loadBlock(b, true);
+ return blockSeek(key, offset, length, false);
+ }
+
+ @Override
+ public int reseekTo(byte[] key, int offset, int length)
+ throws IOException {
+ if (blockBuffer != null && currKeyLen != 0) {
+ ByteBuffer bb = getKey();
+ int compared = reader.getComparator().compare(key, offset,
+ length, bb.array(), bb.arrayOffset(), bb.limit());
+ if (compared < 1) {
+ // If the required key is less than or equal to current key, then
+ // don't do anything.
+ return compared;
+ }
+ }
+
+ int b = readerV1.blockContainingKey(key, offset, length);
+ if (b < 0) {
+ return -1;
+ }
+ loadBlock(b, false);
+ return blockSeek(key, offset, length, false);
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key, int offset, int length)
+ throws IOException {
+ int b = readerV1.blockContainingKey(key, offset, length);
+ if (b < 0)
+ return false; // key is before the start of the file.
+
+ // Question: does this block begin with 'key'?
+ byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
+ if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
+ key, offset, length) == 0) {
+ // Ok the key we're interested in is the first of the block, so go back
+ // by one.
+ if (b == 0) {
+ // we have a 'problem', the key we want is the first of the file.
+ return false;
+ }
+ b--;
+ // TODO shortcut: seek forward in this block to the last key of the
+ // block.
+ }
+ loadBlock(b, true);
+ blockSeek(key, offset, length, true);
+ return true;
+ }
+ }
+
/**
* Implementation of {@link HFileScanner} interface.
*/
- protected static class ScannerV1 extends AbstractHFileReader.Scanner {
- private final HFileReaderV1 reader;
- private int currBlock;
+
+ protected static class ScannerV1 extends AbstractScannerV1 {
+ private HFileReaderV1 reader;
public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
- super(cacheBlocks, pread, isCompaction);
+ super(reader, cacheBlocks, pread, isCompaction);
this.reader = reader;
}
@@ -486,57 +589,7 @@ public class HFileReaderV1 extends Abstr
}
@Override
- public int seekTo(byte[] key) throws IOException {
- return seekTo(key, 0, key.length);
- }
-
- @Override
- public int seekTo(byte[] key, int offset, int length) throws IOException {
- int b = reader.blockContainingKey(key, offset, length);
- if (b < 0) return -1; // falls before the beginning of the file! :-(
- // Avoid re-reading the same block (that'd be dumb).
- loadBlock(b, true);
- return blockSeek(key, offset, length, false);
- }
-
- @Override
- public int reseekTo(byte[] key) throws IOException {
- return reseekTo(key, 0, key.length);
- }
-
- @Override
- public int reseekTo(byte[] key, int offset, int length)
- throws IOException {
- if (blockBuffer != null && currKeyLen != 0) {
- ByteBuffer bb = getKey();
- int compared = reader.getComparator().compare(key, offset,
- length, bb.array(), bb.arrayOffset(), bb.limit());
- if (compared <= 0) {
- // If the required key is less than or equal to current key, then
- // don't do anything.
- return compared;
- }
- }
-
- int b = reader.blockContainingKey(key, offset, length);
- if (b < 0) {
- return -1;
- }
- loadBlock(b, false);
- return blockSeek(key, offset, length, false);
- }
-
- /**
- * Within a loaded block, seek looking for the first key
- * that is smaller than (or equal to?) the key we are interested in.
- *
- * A note on the seekBefore - if you have seekBefore = true, AND the
- * first key in the block = key, then you'll get thrown exceptions.
- * @param key to find
- * @param seekBefore find the key before the exact match.
- * @return
- */
- private int blockSeek(byte[] key, int offset, int length,
+ protected int blockSeek(byte[] key, int offset, int length,
boolean seekBefore) {
int klen, vlen;
int lastLen = 0;
@@ -578,37 +631,6 @@ public class HFileReaderV1 extends Abstr
}
@Override
- public boolean seekBefore(byte[] key) throws IOException {
- return seekBefore(key, 0, key.length);
- }
-
- @Override
- public boolean seekBefore(byte[] key, int offset, int length)
- throws IOException {
- int b = reader.blockContainingKey(key, offset, length);
- if (b < 0)
- return false; // key is before the start of the file.
-
- // Question: does this block begin with 'key'?
- byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
- if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
- key, offset, length) == 0) {
- // Ok the key we're interested in is the first of the block, so go back
- // by one.
- if (b == 0) {
- // we have a 'problem', the key we want is the first of the file.
- return false;
- }
- b--;
- // TODO shortcut: seek forward in this block to the last key of the
- // block.
- }
- loadBlock(b, true);
- blockSeek(key, offset, length, true);
- return true;
- }
-
- @Override
public String getKeyString() {
return Bytes.toStringBinary(blockBuffer.array(),
blockBuffer.arrayOffset() + blockBuffer.position(), currKeyLen);
@@ -621,11 +643,6 @@ public class HFileReaderV1 extends Abstr
}
@Override
- public Reader getReader() {
- return reader;
- }
-
- @Override
public boolean seekTo() throws IOException {
if (reader.getDataBlockIndexReader().isEmpty()) {
return false;
@@ -645,7 +662,8 @@ public class HFileReaderV1 extends Abstr
return true;
}
- private void loadBlock(int bloc, boolean rewind) throws IOException {
+ @Override
+ protected void loadBlock(int bloc, boolean rewind) throws IOException {
if (blockBuffer == null) {
blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
isCompaction);
@@ -672,6 +690,115 @@ public class HFileReaderV1 extends Abstr
}
+ protected static class EncodedScannerV1 extends AbstractScannerV1 {
+ private DataBlockEncoder.EncodedSeeker seeker = null;
+ private DataBlockEncoder dataBlockEncoder = null;
+
+ public EncodedScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
+ boolean pread, boolean isCompaction) {
+ super(reader, cacheBlocks, pread, isCompaction);
+ }
+
+ @Override
+ public boolean seekTo() throws IOException {
+ if (reader.getDataBlockIndexReader().isEmpty()) {
+ return false;
+ }
+
+ loadBlock(0, true);
+ return true;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ if (blockBuffer == null) {
+ throw new IOException("Next called on non-seeked scanner");
+ }
+
+ boolean ok = seeker.next();
+
+ if (!ok) {
+ if (currBlock + 1 >=
+ reader.getDataBlockIndexReader().getRootBlockCount()) {
+ // damn we are at the end
+ currBlock = 0;
+ blockBuffer = null;
+ return false;
+ }
+ loadBlock(currBlock + 1, false);
+ ok = true;
+ }
+
+ return ok;
+ }
+
+ @Override
+ public ByteBuffer getKey() {
+ return seeker.getKey();
+ }
+
+ @Override
+ public ByteBuffer getValue() {
+ return seeker.getValue();
+ }
+
+ @Override
+ public KeyValue getKeyValue() {
+ if (blockBuffer == null) {
+ return null;
+ }
+ return seeker.getKeyValueObject();
+ }
+
+ @Override
+ public String getKeyString() {
+ ByteBuffer keyBuffer = seeker.getKey();
+ return Bytes.toStringBinary(keyBuffer.array(),
+ keyBuffer.arrayOffset(), keyBuffer.limit());
+ }
+
+ @Override
+ public String getValueString() {
+ ByteBuffer valueBuffer = seeker.getValue();
+ return Bytes.toStringBinary(valueBuffer.array(),
+ valueBuffer.arrayOffset(), valueBuffer.limit());
+ }
+
+ @Override
+ protected int blockSeek(byte[] key, int offset, int length,
+ boolean seekBefore) {
+ return seeker.blockSeekTo(key, offset, length, seekBefore);
+ }
+
+ @Override
+ protected void loadBlock(int bloc, boolean rewind) throws IOException {
+ if (blockBuffer == null || bloc != currBlock) {
+ blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread,
+ isCompaction);
+ currBlock = bloc;
+ blockFetches++;
+ short dataBlockEncoderId = blockBuffer.getShort();
+ blockBuffer = blockBuffer.slice();
+
+ if (seeker == null ||
+ DataBlockEncodings.isCorrectEncoder(
+ dataBlockEncoder, dataBlockEncoderId)) {
+ dataBlockEncoder =
+ DataBlockEncodings.getDataBlockEncoderFromId(dataBlockEncoderId);
+ seeker = dataBlockEncoder.createSeeker(reader.getComparator(),
+ MemStore.NO_PERSISTENT_TS);
+ }
+ seeker.setCurrentBuffer(blockBuffer);
+
+ } else {
+ // we are already in the same block, just rewind to seek again.
+ if (rewind) {
+ seeker.rewind();
+ }
+ }
+ }
+ }
+
@Override
public HFileBlock readBlock(long offset, long onDiskBlockSize,
boolean cacheBlock, boolean pread, boolean isCompaction) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Sat Dec 24 21:20:39 2011
@@ -30,6 +30,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.util.Bytes;
@@ -80,17 +82,20 @@ public class HFileReaderV2 extends Abstr
* @param size Length of the stream.
* @param closeIStream Whether to close the stream.
* @param cacheConf Cache configuration.
+ * @param blockEncoder what kind of data block encoding will be used
* @throws IOException
*/
public HFileReaderV2(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long size,
- final boolean closeIStream, final CacheConfig cacheConf)
+ final boolean closeIStream, final CacheConfig cacheConf,
+ final HFileDataBlockEncoder blockEncoder)
throws IOException {
- super(path, trailer, fsdis, size, closeIStream, cacheConf);
-
+ super(path, trailer, fsdis, size, closeIStream, cacheConf,
+ blockEncoder);
trailer.expectVersion(2);
- fsBlockReader = new HFileBlock.FSReaderV2(fsdis, compressAlgo,
- fileSize);
+ HFileBlock.FSReaderV2 fsBlockReader = new HFileBlock.FSReaderV2(fsdis,
+ compressAlgo, fileSize, blockEncoder);
+ this.fsBlockReader = fsBlockReader;
// Comparator class name is stored in the trailer in version 2.
comparator = trailer.createComparator();
@@ -123,8 +128,10 @@ public class HFileReaderV2 extends Abstr
avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
- includesMemstoreTS = (keyValueFormatVersion != null &&
- Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE);
+ includesMemstoreTS = keyValueFormatVersion != null &&
+ Bytes.toInt(keyValueFormatVersion) ==
+ HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
+ fsBlockReader.setIncludesMemstoreTS(includesMemstoreTS);
// Store all other load-on-open blocks for further consumption.
HFileBlock b;
@@ -145,9 +152,15 @@ public class HFileReaderV2 extends Abstr
* @param isCompaction is scanner being used for a compaction?
* @return Scanner on this file.
*/
- @Override
- public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+ @Override
+ public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
+ // check if we want to use data block encoding in memory
+ if (blockEncoder.useEncodedScanner(isCompaction)) {
+ return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
+ includesMemstoreTS);
+ }
+
return new ScannerV2(this, cacheBlocks, pread, isCompaction);
}
@@ -258,6 +271,8 @@ public class HFileReaderV2 extends Abstr
if (cacheConf.isBlockCacheEnabled()) {
HFileBlock cachedBlock =
(HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock);
+ cachedBlock = blockEncoder.afterBlockCache(cachedBlock,
+ isCompaction, shouldIncludeMemstoreTS());
if (cachedBlock != null) {
BlockCategory blockCategory =
cachedBlock.getBlockType().getCategory();
@@ -265,8 +280,9 @@ public class HFileReaderV2 extends Abstr
getSchemaMetrics().updateOnCacheHit(blockCategory, isCompaction);
- if (cachedBlock.getBlockType() == BlockType.DATA)
+ if (cachedBlock.getBlockType() == BlockType.DATA) {
HFile.dataBlockReadCnt.incrementAndGet();
+ }
return cachedBlock;
}
// Carry on, please load.
@@ -292,6 +308,8 @@ public class HFileReaderV2 extends Abstr
// Cache the block
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
hfileBlock.getBlockType().getCategory())) {
+ hfileBlock = blockEncoder.beforeBlockCache(
+ hfileBlock, includesMemstoreTS);
cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
cacheConf.isInMemory());
}
@@ -300,7 +318,8 @@ public class HFileReaderV2 extends Abstr
HFile.dataBlockReadCnt.incrementAndGet();
}
- return hfileBlock;
+ return blockEncoder.afterReadFromDiskAndPuttingInCache(hfileBlock,
+ isCompaction, includesMemstoreTS);
} finally {
offsetLock.releaseLockEntry(lockEntry);
}
@@ -345,25 +364,154 @@ public class HFileReaderV2 extends Abstr
}
}
+ protected abstract static class AbstractScannerV2
+ extends AbstractHFileReader.Scanner {
+ protected HFileBlock block;
+
+ public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
+ final boolean pread, final boolean isCompaction) {
+ super(r, cacheBlocks, pread, isCompaction);
+ }
+
+ /**
+ * An internal API function. Seek to the given key, optionally rewinding to
+ * the first key of the block before doing the seek.
+ *
+ * @param key key byte array
+ * @param offset key offset in the key byte array
+ * @param length key length
+ * @param rewind whether to rewind to the first key of the block before
+ * doing the seek. If this is false, we are assuming we never go
+ * back, otherwise the result is undefined.
+ * @return -1 if the key is earlier than the first key of the file,
+ * 0 if we are at the given key, and 1 if we are past the given key
+ * @throws IOException
+ */
+ protected int seekTo(byte[] key, int offset, int length, boolean rewind)
+ throws IOException {
+ HFileBlockIndex.BlockIndexReader indexReader =
+ reader.getDataBlockIndexReader();
+ HFileBlock seekToBlock = indexReader.seekToDataBlock(key, offset, length,
+ block, cacheBlocks, pread, isCompaction);
+ if (seekToBlock == null) {
+ // This happens if the key e.g. falls before the beginning of the file.
+ return -1;
+ }
+ return loadBlockAndSeekToKey(seekToBlock, rewind, key, offset, length,
+ false);
+ }
+
+ protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
+
+ protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock,
+ boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
+ throws IOException;
+
+ @Override
+ public int seekTo(byte[] key, int offset, int length) throws IOException {
+ // Always rewind to the first key of the block, because the given key
+ // might be before or after the current key.
+ return seekTo(key, offset, length, true);
+ }
+
+ @Override
+ public int reseekTo(byte[] key, int offset, int length) throws IOException {
+ if (isSeeked()) {
+ ByteBuffer bb = getKey();
+ int compared = reader.getComparator().compare(key, offset,
+ length, bb.array(), bb.arrayOffset(), bb.limit());
+ if (compared < 1) {
+ // If the required key is less than or equal to current key, then
+ // don't do anything.
+ return compared;
+ }
+ }
+
+ // Don't rewind on a reseek operation, because reseek implies that we are
+ // always going forward in the file.
+ return seekTo(key, offset, length, false);
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key, int offset, int length)
+ throws IOException {
+ HFileBlock seekToBlock =
+ reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
+ block, cacheBlocks, pread, isCompaction);
+ if (seekToBlock == null) {
+ return false;
+ }
+ ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
+ if (reader.getComparator().compare(firstKey.array(),
+ firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
+ {
+ long previousBlockOffset = seekToBlock.getPrevBlockOffset();
+ // The key we are interested in
+ if (previousBlockOffset == -1) {
+ // we have a 'problem', the key we want is the first of the file.
+ return false;
+ }
+
+ // It is important that we compute and pass onDiskSize to the block
+ // reader so that it does not have to read the header separately to
+ // figure out the size.
+ seekToBlock = reader.readBlock(previousBlockOffset,
+ seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
+ pread, isCompaction);
+
+ // TODO shortcut: seek forward in this block to the last key of the
+ // block.
+ }
+ loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
+ return true;
+ }
+
+
+ /**
+ * Scans blocks in the "scanned" section of the {@link HFile} until the next
+ * data block is found.
+ *
+ * @return the next block, or null if there are no more data blocks
+ * @throws IOException
+ */
+ protected HFileBlock readNextDataBlock() throws IOException {
+ long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
+ if (block == null)
+ return null;
+
+ HFileBlock curBlock = block;
+
+ do {
+ if (curBlock.getOffset() >= lastDataBlockOffset)
+ return null;
+
+ if (curBlock.getOffset() < 0) {
+ throw new IOException("Invalid block file offset: " + block);
+ }
+ curBlock = reader.readBlock(curBlock.getOffset()
+ + curBlock.getOnDiskSizeWithHeader(),
+ curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
+ isCompaction);
+ } while (!(curBlock.getBlockType().equals(BlockType.DATA) ||
+ curBlock.getBlockType().equals(BlockType.ENCODED_DATA)));
+
+ return curBlock;
+ }
+ }
+
/**
* Implementation of {@link HFileScanner} interface.
*/
- protected static class ScannerV2 extends AbstractHFileReader.Scanner {
- private HFileBlock block;
+ protected static class ScannerV2 extends AbstractScannerV2 {
private HFileReaderV2 reader;
public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
- super(cacheBlocks, pread, isCompaction);
+ super(r, cacheBlocks, pread, isCompaction);
this.reader = r;
}
@Override
- public HFileReaderV2 getReader() {
- return reader;
- }
-
- @Override
public KeyValue getKeyValue() {
if (!isSeeked())
return null;
@@ -452,36 +600,6 @@ public class HFileReaderV2 extends Abstr
}
/**
- * Scans blocks in the "scanned" section of the {@link HFile} until the next
- * data block is found.
- *
- * @return the next block, or null if there are no more data blocks
- * @throws IOException
- */
- private HFileBlock readNextDataBlock() throws IOException {
- long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
- if (block == null)
- return null;
-
- HFileBlock curBlock = block;
-
- do {
- if (curBlock.getOffset() >= lastDataBlockOffset)
- return null;
-
- if (curBlock.getOffset() < 0) {
- throw new IOException("Invalid block file offset: " + block);
- }
- curBlock = reader.readBlock(curBlock.getOffset()
- + curBlock.getOnDiskSizeWithHeader(),
- curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
- isCompaction);
- } while (!curBlock.getBlockType().equals(BlockType.DATA));
-
- return curBlock;
- }
-
- /**
* Positions this scanner at the start of the file.
*
* @return false if empty file; i.e. a call to next would return false and
@@ -517,70 +635,7 @@ public class HFileReaderV2 extends Abstr
}
@Override
- public int seekTo(byte[] key) throws IOException {
- return seekTo(key, 0, key.length);
- }
-
- /**
- * An internal API function. Seek to the given key, optionally rewinding to
- * the first key of the block before doing the seek.
- *
- * @param key key byte array
- * @param offset key offset in the key byte array
- * @param length key length
- * @param rewind whether to rewind to the first key of the block before
- * doing the seek. If this is false, we are assuming we never go
- * back, otherwise the result is undefined.
- * @return -1 if the key is earlier than the first key of the file,
- * 0 if we are at the given key, and 1 if we are past the given key
- * @throws IOException
- */
- private int seekTo(byte[] key, int offset, int length, boolean rewind)
- throws IOException {
- HFileBlockIndex.BlockIndexReader indexReader =
- reader.getDataBlockIndexReader();
- HFileBlock seekToBlock = indexReader.seekToDataBlock(key, offset, length,
- block, cacheBlocks, pread, isCompaction);
-
- if (seekToBlock == null) {
- // This happens if the key e.g. falls before the beginning of the file.
- return -1;
- }
- return loadBlockAndSeekToKey(seekToBlock, rewind, key, offset, length,
- false);
- }
-
- @Override
- public int seekTo(byte[] key, int offset, int length) throws IOException {
- // Always rewind to the first key of the block, because the given key
- // might be before or after the current key.
- return seekTo(key, offset, length, true);
- }
-
- @Override
- public int reseekTo(byte[] key) throws IOException {
- return reseekTo(key, 0, key.length);
- }
-
- @Override
- public int reseekTo(byte[] key, int offset, int length) throws IOException {
- if (isSeeked()) {
- ByteBuffer bb = getKey();
- int compared = reader.getComparator().compare(key, offset,
- length, bb.array(), bb.arrayOffset(), bb.limit());
- if (compared < 1) {
- // If the required key is less than or equal to current key, then
- // don't do anything.
- return compared;
- }
- }
-
- // Don't rewind on a reseek operation, because reseek implies that we are
- // always going forward in the file.
- return seekTo(key, offset, length, false);
- }
-
- private int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
+ protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
byte[] key, int offset, int length, boolean seekBefore)
throws IOException {
if (block == null || block.getOffset() != seekToBlock.getOffset()) {
@@ -599,6 +654,13 @@ public class HFileReaderV2 extends Abstr
*/
private void updateCurrBlock(HFileBlock newBlock) {
block = newBlock;
+
+ // sanity check
+ if (block.getBlockType() != BlockType.DATA) {
+ throw new IllegalStateException(
+ "ScannerV2 works only on data blocks");
+ }
+
blockBuffer = block.getBufferWithoutHeader();
readKeyValueLen();
blockFetches++;
@@ -713,11 +775,7 @@ public class HFileReaderV2 extends Abstr
}
@Override
- public boolean seekBefore(byte[] key) throws IOException {
- return seekBefore(key, 0, key.length);
- }
-
- private ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+ protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
ByteBuffer buffer = curBlock.getBufferWithoutHeader();
// It is safe to manipulate this buffer because we own the buffer object.
buffer.rewind();
@@ -730,53 +788,174 @@ public class HFileReaderV2 extends Abstr
}
@Override
- public boolean seekBefore(byte[] key, int offset, int length)
- throws IOException {
- HFileBlock seekToBlock =
- reader.getDataBlockIndexReader().seekToDataBlock(key, offset,
- length, block, cacheBlocks, pread, isCompaction);
- if (seekToBlock == null) {
+ public String getKeyString() {
+ return Bytes.toStringBinary(blockBuffer.array(),
+ blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE, currKeyLen);
+ }
+
+ @Override
+ public String getValueString() {
+ return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
+ + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
+ currValueLen);
+ }
+ }
+
+ /**
+ * ScannerV2 which operate on encoded data blocks.
+ */
+ protected static class EncodedScannerV2 extends AbstractScannerV2 {
+ private DataBlockEncoder.EncodedSeeker seeker = null;
+ private DataBlockEncoder dataBlockEncoder = null;
+ private final boolean includesMemstoreTS;
+
+ public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
+ boolean pread, boolean isCompaction, boolean includesMemstoreTS) {
+ super(reader, cacheBlocks, pread, isCompaction);
+ this.includesMemstoreTS = includesMemstoreTS;
+ }
+
+ private void setDataBlockEncoder(DataBlockEncoder dataBlockEncoder) {
+ this.dataBlockEncoder = dataBlockEncoder;
+ seeker = dataBlockEncoder.createSeeker(reader.getComparator(),
+ includesMemstoreTS);
+ }
+
+ /**
+ * Updates the current block to be the given {@link HFileBlock}. Seeks to
+ * the the first key/value pair.
+ *
+ * @param newBlock the block to make current
+ */
+ private void updateCurrentBlock(HFileBlock newBlock) {
+ block = newBlock;
+
+ // sanity checks
+ if (block.getBlockType() != BlockType.ENCODED_DATA) {
+ throw new IllegalStateException(
+ "EncodedScannerV2 works only on encoded data blocks");
+ }
+
+ short dataBlockEncoderId = block.getDataBlockEncodingId();
+ if (dataBlockEncoder == null
+ || !DataBlockEncodings.isCorrectEncoder(dataBlockEncoder,
+ dataBlockEncoderId)) {
+ DataBlockEncoder encoder =
+ DataBlockEncodings.getDataBlockEncoderFromId(dataBlockEncoderId);
+ setDataBlockEncoder(encoder);
+ }
+
+ seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
+ blockFetches++;
+ }
+
+ private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
+ ByteBuffer origBlock = newBlock.getBufferReadOnly();
+ ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
+ origBlock.arrayOffset() + HFileBlock.HEADER_SIZE +
+ DataBlockEncodings.ID_SIZE,
+ origBlock.limit() - HFileBlock.HEADER_SIZE -
+ DataBlockEncodings.ID_SIZE).slice();
+ return encodedBlock;
+ }
+
+ @Override
+ public boolean seekTo() throws IOException {
+ if (reader == null) {
return false;
}
- ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
- if (reader.getComparator().compare(firstKey.array(),
- firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
- {
- long previousBlockOffset = seekToBlock.getPrevBlockOffset();
- // The key we are interested in
- if (previousBlockOffset == -1) {
- // we have a 'problem', the key we want is the first of the file.
- return false;
- }
- // It is important that we compute and pass onDiskSize to the block
- // reader so that it does not have to read the header separately to
- // figure out the size.
- seekToBlock = reader.readBlock(previousBlockOffset,
- seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
- pread, isCompaction);
+ if (reader.getTrailer().getEntryCount() == 0) {
+ // No data blocks.
+ return false;
+ }
- // TODO shortcut: seek forward in this block to the last key of the
- // block.
+ long firstDataBlockOffset =
+ reader.getTrailer().getFirstDataBlockOffset();
+ if (block != null && block.getOffset() == firstDataBlockOffset) {
+ seeker.rewind();
+ return true;
}
- loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
+
+ block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
+ isCompaction);
+ if (block.getOffset() < 0) {
+ throw new IOException("Invalid block offset: " + block.getOffset());
+ }
+ updateCurrentBlock(block);
return true;
}
@Override
+ public boolean next() throws IOException {
+ boolean isValid = seeker.next();
+ if (!isValid) {
+ block = readNextDataBlock();
+ isValid = block != null;
+ if (isValid) {
+ updateCurrentBlock(block);
+ }
+ }
+ return isValid;
+ }
+
+ @Override
+ public ByteBuffer getKey() {
+ assertValidSeek();
+ return seeker.getKey();
+ }
+
+ @Override
+ public ByteBuffer getValue() {
+ assertValidSeek();
+ return seeker.getValue();
+ }
+
+ @Override
+ public KeyValue getKeyValue() {
+ if (block == null) {
+ return null;
+ }
+ return seeker.getKeyValueObject();
+ }
+
+ @Override
public String getKeyString() {
- return Bytes.toStringBinary(blockBuffer.array(),
- blockBuffer.arrayOffset() + blockBuffer.position()
- + KEY_VALUE_LEN_SIZE, currKeyLen);
+ ByteBuffer keyBuffer = getKey();
+ return Bytes.toStringBinary(keyBuffer.array(),
+ keyBuffer.arrayOffset(), keyBuffer.limit());
}
@Override
public String getValueString() {
- return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
- + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
- currValueLen);
+ ByteBuffer valueBuffer = getValue();
+ return Bytes.toStringBinary(valueBuffer.array(),
+ valueBuffer.arrayOffset(), valueBuffer.limit());
+ }
+
+ private void assertValidSeek() {
+ if (block == null) {
+ throw new NotSeekedException();
+ }
}
+ @Override
+ protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+ return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
+ }
+
+ @Override
+ protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
+ byte[] key, int offset, int length, boolean seekBefore)
+ throws IOException {
+ if (block == null || block.getOffset() != seekToBlock.getOffset()) {
+ updateCurrentBlock(seekToBlock);
+ } else if (rewind) {
+ seeker.rewind();
+ }
+ return seeker.blockSeekTo(key, offset, length, seekBefore);
+ }
}
/**