You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/12/24 22:20:41 UTC
svn commit: r1223020 [2/5] - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/
main/java/org/apache/hadoop/hbase/io/encoding/
main/java/org/apache/hadoop/hbase/io/hfile/
main/java/org/apache/hadoop/hbase/mapr...
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Compress using:
+ * - store size of common prefix
+ * - save column family once, it is same within HFile
+ * - use integer compression for key, value and prefix (7-bit encoding)
+ * - use bits to avoid duplication key length, value length
+ * and type if it same as previous
+ * - store in 3 bits length of timestamp field
+ * - allow diff in timestamp instead of actual value
+ *
+ * Format:
+ * - 1 byte: flag
+ * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
+ * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
+ * - 1-5 bytes: prefix length
+ * - ... bytes: rest of the row (if prefix length is small enough)
+ * - ... bytes: qualifier (or suffix depending on prefix length)
+ * - 1-8 bytes: timestamp or diff
+ * - 1 byte: type (only if FLAG_SAME_TYPE is not set in the flag)
+ * - ... bytes: value
+ */
+public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
+ static final int FLAG_SAME_KEY_LENGTH = 1;
+ static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
+ static final int FLAG_SAME_TYPE = 1 << 2;
+ static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
+ static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
+ static final int SHIFT_TIMESTAMP_LENGTH = 4;
+ static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
+
+ protected static class DiffCompressionState extends CompressionState {
+ long timestamp;
+ byte[] familyNameWithSize;
+
+ @Override
+ protected void readTimestamp(ByteBuffer in) {
+ timestamp = in.getLong();
+ }
+
+ @Override
+ void copyFrom(CompressionState state) {
+ super.copyFrom(state);
+ DiffCompressionState state2 = (DiffCompressionState) state;
+ timestamp = state2.timestamp;
+ }
+ }
+
+ private void compressSingleKeyValue(DiffCompressionState previousState,
+ DiffCompressionState currentState, DataOutputStream out,
+ ByteBuffer in) throws IOException {
+ byte flag = 0;
+ int kvPos = in.position();
+ int keyLength = in.getInt();
+ int valueLength = in.getInt();
+
+ long timestamp;
+ long diffTimestamp = 0;
+ int diffTimestampFitsInBytes = 0;
+
+ int commonPrefix;
+
+ int timestampFitsInBytes;
+
+ if (previousState.isFirst()) {
+ currentState.readKey(in, keyLength, valueLength);
+ currentState.prevOffset = kvPos;
+ timestamp = currentState.timestamp;
+ if (timestamp < 0) {
+ flag |= FLAG_TIMESTAMP_SIGN;
+ timestamp = -timestamp;
+ }
+ timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+
+ flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+ commonPrefix = 0;
+
+ // put column family
+ in.mark();
+ ByteBufferUtils.skip(in, currentState.rowLength
+ + KeyValue.ROW_LENGTH_SIZE);
+ ByteBufferUtils.copyToStream(out, in, currentState.familyLength
+ + KeyValue.FAMILY_LENGTH_SIZE);
+ in.reset();
+ } else {
+ // find a common prefix and skip it
+ commonPrefix =
+ ByteBufferUtils.findCommonPrefix(in, in.position(),
+ previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength
+ - KeyValue.TIMESTAMP_TYPE_SIZE);
+ // don't compress timestamp and type using prefix
+
+ currentState.readKey(in, keyLength, valueLength,
+ commonPrefix, previousState);
+ currentState.prevOffset = kvPos;
+ timestamp = currentState.timestamp;
+ boolean minusTimestamp = timestamp < 0;
+ if (minusTimestamp) {
+ timestamp = -timestamp;
+ }
+ timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+
+ if (keyLength == previousState.keyLength) {
+ flag |= FLAG_SAME_KEY_LENGTH;
+ }
+ if (valueLength == previousState.valueLength) {
+ flag |= FLAG_SAME_VALUE_LENGTH;
+ }
+ if (currentState.type == previousState.type) {
+ flag |= FLAG_SAME_TYPE;
+ }
+
+ // encode timestamp
+ diffTimestamp = previousState.timestamp - currentState.timestamp;
+ boolean minusDiffTimestamp = diffTimestamp < 0;
+ if (minusDiffTimestamp) {
+ diffTimestamp = -diffTimestamp;
+ }
+ diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
+ if (diffTimestampFitsInBytes < timestampFitsInBytes) {
+ flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+ flag |= FLAG_TIMESTAMP_IS_DIFF;
+ if (minusDiffTimestamp) {
+ flag |= FLAG_TIMESTAMP_SIGN;
+ }
+ } else {
+ flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+ if (minusTimestamp) {
+ flag |= FLAG_TIMESTAMP_SIGN;
+ }
+ }
+ }
+
+ ByteBufferUtils.copyToStream(out, flag);
+ if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(out, keyLength);
+ }
+ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(out, valueLength);
+ }
+
+ ByteBufferUtils.putCompressedInt(out, commonPrefix);
+ ByteBufferUtils.skip(in, commonPrefix);
+
+ if (previousState.isFirst() ||
+ commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+ int restRowLength =
+ currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
+ ByteBufferUtils.copyToStream(out, in, restRowLength);
+ ByteBufferUtils.skip(in, currentState.familyLength +
+ KeyValue.FAMILY_LENGTH_SIZE);
+ ByteBufferUtils.copyToStream(out, in, currentState.qualifierLength);
+ } else {
+ ByteBufferUtils.copyToStream(out, in,
+ keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
+ }
+
+ if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
+ ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
+ } else {
+ ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
+ }
+
+ if ((flag & FLAG_SAME_TYPE) == 0) {
+ ByteBufferUtils.copyToStream(out, currentState.type);
+ }
+ ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE);
+
+ ByteBufferUtils.copyToStream(out, in, valueLength);
+ }
+
+ private void uncompressSingleKeyValue(DataInputStream source,
+ ByteBuffer buffer,
+ DiffCompressionState state)
+ throws IOException, EncoderBufferTooSmallException {
+ // read the column family at the beginning
+ if (state.isFirst()) {
+ state.familyLength = source.readByte();
+ state.familyNameWithSize =
+ new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
+ state.familyNameWithSize[0] = state.familyLength;
+ source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
+ state.familyLength);
+ }
+
+ // read flag
+ byte flag = source.readByte();
+
+ // read key/value/common lengths
+ int keyLength;
+ int valueLength;
+ if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
+ keyLength = state.keyLength;
+ } else {
+ keyLength = ByteBufferUtils.readCompressedInt(source);
+ }
+ if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
+ valueLength = state.valueLength;
+ } else {
+ valueLength = ByteBufferUtils.readCompressedInt(source);
+ }
+ int commonPrefix = ByteBufferUtils.readCompressedInt(source);
+
+ // create KeyValue buffer and fill it prefix
+ int keyOffset = buffer.position();
+ ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength
+ + KeyValue.ROW_OFFSET);
+ buffer.putInt(keyLength);
+ buffer.putInt(valueLength);
+
+ // copy common from previous key
+ if (commonPrefix > 0) {
+ ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevOffset
+ + KeyValue.ROW_OFFSET, commonPrefix);
+ }
+
+ // copy the rest of the key from the buffer
+ int keyRestLength;
+ if (state.isFirst() || commonPrefix <
+ state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+ // omit the family part of the key, it is always the same
+ short rowLength;
+ int rowRestLength;
+
+ // check length of row
+ if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
+ // not yet copied, do it now
+ ByteBufferUtils.copyFromStream(source, buffer,
+ KeyValue.ROW_LENGTH_SIZE - commonPrefix);
+ ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
+ rowLength = buffer.getShort();
+ rowRestLength = rowLength;
+ } else {
+ // already in buffer, just read it
+ rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
+ rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
+ }
+
+ // copy the rest of row
+ ByteBufferUtils.copyFromStream(source, buffer, rowRestLength);
+ state.rowLength = rowLength;
+
+ // copy the column family
+ buffer.put(state.familyNameWithSize);
+
+ keyRestLength = keyLength - rowLength -
+ state.familyNameWithSize.length -
+ (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
+ } else {
+ // prevRowWithSizeLength is the same as on previous row
+ keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
+ }
+ // copy the rest of the key, after column family -> column qualifier
+ ByteBufferUtils.copyFromStream(source, buffer, keyRestLength);
+
+ // handle timestamp
+ int timestampFitsInBytes =
+ ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
+ long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
+ if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
+ timestamp = -timestamp;
+ }
+ if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
+ timestamp = state.timestamp - timestamp;
+ }
+ buffer.putLong(timestamp);
+
+ // copy the type field
+ byte type;
+ if ((flag & FLAG_SAME_TYPE) != 0) {
+ type = state.type;
+ } else {
+ type = source.readByte();
+ }
+ buffer.put(type);
+
+ // copy value part
+ ByteBufferUtils.copyFromStream(source, buffer, valueLength);
+
+ state.keyLength = keyLength;
+ state.valueLength = valueLength;
+ state.prevOffset = keyOffset;
+ state.timestamp = timestamp;
+ state.type = type;
+ // state.qualifier is unused
+ }
+
+ @Override
+ public void compressKeyValues(DataOutputStream out,
+ ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+ in.rewind();
+ ByteBufferUtils.putInt(out, in.limit());
+ DiffCompressionState previousState = new DiffCompressionState();
+ DiffCompressionState currentState = new DiffCompressionState();
+ while (in.hasRemaining()) {
+ compressSingleKeyValue(previousState, currentState,
+ out, in);
+ afterEncodingKeyValue(in, out, includesMemstoreTS);
+
+ // swap previousState <-> currentState
+ DiffCompressionState tmp = previousState;
+ previousState = currentState;
+ currentState = tmp;
+ }
+ }
+
+ @Override
+ public ByteBuffer uncompressKeyValues(DataInputStream source,
+ int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+ throws IOException {
+ int decompressedSize = source.readInt();
+ ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+ allocHeaderLength);
+ buffer.position(allocHeaderLength);
+ DiffCompressionState state = new DiffCompressionState();
+ while (source.available() > skipLastBytes) {
+ uncompressSingleKeyValue(source, buffer, state);
+ afterDecodingKeyValue(source, buffer, includesMemstoreTS);
+ }
+
+ if (source.available() != skipLastBytes) {
+ throw new IllegalStateException("Read too much bytes.");
+ }
+
+ return buffer;
+ }
+
+ @Override
+ public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+ block.mark();
+ block.position(Bytes.SIZEOF_INT);
+ byte familyLength = block.get();
+ ByteBufferUtils.skip(block, familyLength);
+ byte flag = block.get();
+ int keyLength = ByteBufferUtils.readCompressedInt(block);
+ ByteBufferUtils.readCompressedInt(block); // valueLength
+ ByteBufferUtils.readCompressedInt(block); // commonLength
+ ByteBuffer result = ByteBuffer.allocate(keyLength);
+
+ // copy row
+ int pos = result.arrayOffset();
+ block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
+ pos += Bytes.SIZEOF_SHORT;
+ short rowLength = result.getShort();
+ block.get(result.array(), pos, rowLength);
+ pos += rowLength;
+
+ // copy family
+ int savePosition = block.position();
+ block.position(Bytes.SIZEOF_INT);
+ block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
+ pos += familyLength + Bytes.SIZEOF_BYTE;
+
+ // copy qualifier
+ block.position(savePosition);
+ int qualifierLength =
+ keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
+ block.get(result.array(), pos, qualifierLength);
+ pos += qualifierLength;
+
+ // copy the timestamp and type
+ int timestampFitInBytes =
+ ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
+ long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
+ if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
+ timestamp = -timestamp;
+ }
+ result.putLong(pos, timestamp);
+ pos += Bytes.SIZEOF_LONG;
+ block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
+
+ block.reset();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return DiffKeyDeltaEncoder.class.getSimpleName();
+ }
+
+ @Override
+ public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+ final boolean includesMemstoreTS) {
+ return new BufferedEncodedSeeker(comparator) {
+ private static final int TIMESTAMP_WITH_TYPE_LENGTH =
+ Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
+ private byte[] familyNameWithSize;
+ private int rowLengthWithSize;
+ private long timestamp;
+
+ private void decode(boolean isFirst) {
+ byte flag = currentBuffer.get();
+ byte type = 0;
+ if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ if (!isFirst) {
+ type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
+ }
+ current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+ }
+ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+ current.valueLength =
+ ByteBufferUtils.readCompressedInt(currentBuffer);
+ }
+ current.lastCommonPrefix =
+ ByteBufferUtils.readCompressedInt(currentBuffer);
+
+ current.ensureSpaceForKey();
+
+ if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
+ // length of row is different, copy everything except family
+
+ // copy the row size
+ currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+ Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
+ rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
+ Bytes.SIZEOF_SHORT;
+
+ // copy the rest of row
+ currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
+ rowLengthWithSize - Bytes.SIZEOF_SHORT);
+
+ // copy the column family
+ System.arraycopy(familyNameWithSize, 0,
+ current.keyBuffer, rowLengthWithSize, familyNameWithSize.length);
+
+ // copy the qualifier
+ currentBuffer.get(current.keyBuffer,
+ rowLengthWithSize + familyNameWithSize.length,
+ current.keyLength - rowLengthWithSize -
+ familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
+ } else if (current.lastCommonPrefix < rowLengthWithSize) {
+ // we have to copy part of row and qualifier,
+ // but column family is in right place
+
+ // before column family (rest of row)
+ currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+ rowLengthWithSize - current.lastCommonPrefix);
+
+ // after column family (qualifier)
+ currentBuffer.get(current.keyBuffer,
+ rowLengthWithSize + familyNameWithSize.length,
+ current.keyLength - rowLengthWithSize -
+ familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
+ } else {
+ // copy just the ending
+ currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+ current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
+ current.lastCommonPrefix);
+ }
+
+
+ // timestamp
+ int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
+ int timestampFitInBytes = 1 +
+ ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
+ long timestampOrDiff =
+ ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
+ if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
+ timestampOrDiff = -timestampOrDiff;
+ }
+ if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
+ timestamp = timestampOrDiff;
+ } else { // it is diff
+ timestamp = timestamp - timestampOrDiff;
+ }
+ Bytes.putLong(current.keyBuffer, pos, timestamp);
+ pos += Bytes.SIZEOF_LONG;
+
+ // type
+ if ((flag & FLAG_SAME_TYPE) == 0) {
+ currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
+ } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ current.keyBuffer[pos] = type;
+ }
+
+ current.valueOffset = currentBuffer.position();
+ ByteBufferUtils.skip(currentBuffer, current.valueLength);
+
+ if (includesMemstoreTS) {
+ current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+ } else {
+ current.memstoreTS = 0;
+ }
+ current.nextKvOffset = currentBuffer.position();
+ }
+
+ @Override
+ protected void decodeFirst() {
+ ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+
+ // read column family
+ byte familyNameLength = currentBuffer.get();
+ familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
+ familyNameWithSize[0] = familyNameLength;
+ currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
+ familyNameLength);
+ decode(true);
+ }
+
+ @Override
+ protected void decodeNext() {
+ decode(false);
+ }
+ };
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * Encapsulates a data block compressed using a particular encoding algorithm.
+ * Useful for testing and benchmarking.
+ */
+public class EncodedDataBlock {
+ private static final int BUFFER_SIZE = 4 * 1024;
+ protected DataBlockEncoder dataBlockEncoder;
+ ByteArrayOutputStream uncompressedOutputStream;
+ ByteBuffer uncompressedBuffer;
+ private byte[] cacheCompressData;
+ private ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
+ private boolean includesMemstoreTS;
+
+ /**
+ * Create a buffer which will be encoded using dataBlockEncoder.
+ * @param dataBlockEncoder Algorithm used for compression.
+ */
+ public EncodedDataBlock(DataBlockEncoder dataBlockEncoder,
+ boolean includesMemstoreTS) {
+ this.dataBlockEncoder = dataBlockEncoder;
+ uncompressedOutputStream = new ByteArrayOutputStream(BUFFER_SIZE);
+ }
+
+ /**
+ * Add KeyValue and compress it.
+ * @param kv Item to be added and compressed.
+ */
+ public void addKv(KeyValue kv) {
+ cacheCompressData = null;
+ uncompressedOutputStream.write(
+ kv.getBuffer(), kv.getOffset(), kv.getLength());
+ }
+
+ /**
+ * Provides access to compressed value.
+ * @return Forwards sequential iterator.
+ */
+ public Iterator<KeyValue> getIterator() {
+ final int uncompressedSize = uncompressedOutputStream.size();
+ final ByteArrayInputStream bais = new ByteArrayInputStream(
+ getCompressedData());
+ final DataInputStream dis = new DataInputStream(bais);
+
+
+ return new Iterator<KeyValue>() {
+ private ByteBuffer decompressedData = null;
+
+ @Override
+ public boolean hasNext() {
+ if (decompressedData == null) {
+ return uncompressedSize > 0;
+ }
+ return decompressedData.hasRemaining();
+ }
+
+ @Override
+ public KeyValue next() {
+ if (decompressedData == null) {
+ try {
+ decompressedData = dataBlockEncoder.uncompressKeyValues(
+ dis, includesMemstoreTS);
+ } catch (IOException e) {
+ throw new RuntimeException("Problem with data block encoder, " +
+ "most likely it requested more bytes than are available.", e);
+ }
+ decompressedData.rewind();
+ }
+
+ int offset = decompressedData.position();
+ KeyValue kv = new KeyValue(decompressedData.array(), offset);
+ decompressedData.position(offset + kv.getLength());
+
+ return kv;
+ }
+
+ @Override
+ public void remove() {
+ throw new NotImplementedException("remove() is not supported!");
+ }
+
+ @Override
+ public String toString() {
+ return "Iterator of: " + dataBlockEncoder.getClass().getName();
+ }
+
+ };
+ }
+
+ /**
+ * Find the size of minimal buffer that could store compressed data.
+ * @return Size in bytes of compressed data.
+ */
+ public int getSize() {
+ return getCompressedData().length;
+ }
+
+ /**
+ * Find the size of compressed data assuming that buffer will be compressed
+ * using given algorithm.
+ * @param compressor Algorithm used for compression.
+ * @param buffer Array to be compressed.
+ * @param offset Offset to beginning of the data.
+ * @param length Length to be compressed.
+ * @return Size of compressed data in bytes.
+ */
+ public static int checkCompressedSize(Compressor compressor, byte[] buffer,
+ int offset, int length) {
+ byte[] compressedBuffer = new byte[buffer.length];
+ // in fact the buffer could be of any positive size
+ compressor.setInput(buffer, offset, length);
+ compressor.finish();
+ int currentPos = 0;
+ while (!compressor.finished()) {
+ try {
+ // we don't care about compressed data,
+ // we just want to callculate number of bytes
+ currentPos += compressor.compress(compressedBuffer, 0,
+ compressedBuffer.length);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "For some reason compressor couldn't read data. " +
+ "It is likely a problem with " +
+ compressor.getClass().getName(), e);
+ }
+ }
+ return currentPos;
+ }
+
+ /**
+ * Estimate size after second stage of compression (e.g. LZO).
+ * @param compressor Algorithm which will be used for compressions.
+ * @return Size after second stage of compression.
+ */
+ public int checkCompressedSize(Compressor compressor) {
+ // compress
+ byte[] compressedBytes = getCompressedData();
+ return checkCompressedSize(compressor, compressedBytes, 0,
+ compressedBytes.length);
+ }
+
+ private byte[] getCompressedData() {
+ // is cached
+ if (cacheCompressData != null) {
+ return cacheCompressData;
+ }
+ cacheCompressData = doCompressData();
+
+ return cacheCompressData;
+ }
+
+ private ByteBuffer getUncompressedBuffer() {
+ if (uncompressedBuffer == null ||
+ uncompressedBuffer.limit() < uncompressedOutputStream.size()) {
+ uncompressedBuffer = ByteBuffer.wrap(
+ uncompressedOutputStream.toByteArray());
+ }
+ return uncompressedBuffer;
+ }
+
+ /**
+ * Do the compression.
+ * @return Compressed byte buffer.
+ */
+ public byte[] doCompressData() {
+ compressedStream.reset();
+ DataOutputStream dataOut = new DataOutputStream(compressedStream);
+ try {
+ this.dataBlockEncoder.compressKeyValues(
+ dataOut, getUncompressedBuffer(), includesMemstoreTS);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Bug in decoding part of algorithm %s. " +
+ "Probably it requested more bytes than are available.",
+ toString()), e);
+ }
+ return compressedStream.toByteArray();
+ }
+
+ @Override
+ public String toString() {
+ return dataBlockEncoder.toString();
+ }
+
+ /**
+ * Get uncompressed buffer.
+ * @return The buffer.
+ */
+ public byte[] getRawKeyValues() {
+ return uncompressedOutputStream.toByteArray();
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+/**
+ * Internal error which indicates a bug in a data block encoding algorithm.
+ */
+public class EncoderBufferTooSmallException extends RuntimeException {
+ private static final long serialVersionUID = 4767495176134878737L;
+
+ public EncoderBufferTooSmallException(String message) {
+ super(message);
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
+ *
+ * Compress using:
+ * - store size of common prefix
+ * - save column family once in the first KeyValue
+ * - use integer compression for key, value and prefix (128-bit encoding)
+ * - use bits to avoid duplication key length, value length
+ * and type if it same as previous
+ * - store in 3 bits length of prefix timestamp
+ * with previous KeyValue's timestamp
+ * - one bit which allow to omit value if it is the same
+ *
+ * Format:
+ * - 1 byte: flag
+ * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
+ * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
+ * - 1-5 bytes: prefix length
+ * - ... bytes: rest of the row (if prefix length is small enough)
+ * - ... bytes: qualifier (or suffix depending on prefix length)
+ * - 1-8 bytes: timestamp suffix
+ * - 1 byte: type (only if FLAG_SAME_TYPE is not set in the flag)
+ * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
+ *
+ */
+public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
+ final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
+ final int SHIFT_TIMESTAMP_LENGTH = 0;
+ final int FLAG_SAME_KEY_LENGTH = 1 << 3;
+ final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
+ final int FLAG_SAME_TYPE = 1 << 5;
+ final int FLAG_SAME_VALUE = 1 << 6;
+
+ private static class FastDiffCompressionState extends CompressionState {
+ byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
+ int prevTimestampOffset;
+
+ @Override
+ protected void readTimestamp(ByteBuffer in) {
+ in.get(timestamp);
+ }
+
+ @Override
+ void copyFrom(CompressionState state) {
+ super.copyFrom(state);
+ FastDiffCompressionState state2 = (FastDiffCompressionState) state;
+ System.arraycopy(state2.timestamp, 0, timestamp, 0,
+ KeyValue.TIMESTAMP_SIZE);
+ prevTimestampOffset = state2.prevTimestampOffset;
+ }
+ }
+
+ private void compressSingleKeyValue(
+ FastDiffCompressionState previousState,
+ FastDiffCompressionState currentState,
+ OutputStream out, ByteBuffer in) throws IOException {
+ currentState.prevOffset = in.position();
+ int keyLength = in.getInt();
+ int valueOffset = currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
+ int valueLength = in.getInt();
+ byte flag = 0;
+
+ if (previousState.isFirst()) {
+ // copy the key, there is no common prefix with none
+ ByteBufferUtils.copyToStream(out, flag);
+ ByteBufferUtils.putCompressedInt(out, keyLength);
+ ByteBufferUtils.putCompressedInt(out, valueLength);
+ ByteBufferUtils.putCompressedInt(out, 0);
+
+ currentState.readKey(in, keyLength, valueLength);
+
+ ByteBufferUtils.copyToStream(out, in, keyLength + valueLength);
+ } else {
+ // find a common prefix and skip it
+ int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
+ previousState.prevOffset + KeyValue.ROW_OFFSET,
+ keyLength - KeyValue.TIMESTAMP_TYPE_SIZE);
+
+ currentState.readKey(in, keyLength, valueLength,
+ commonPrefix, previousState);
+
+ if (keyLength == previousState.keyLength) {
+ flag |= FLAG_SAME_KEY_LENGTH;
+ }
+ if (valueLength == previousState.valueLength) {
+ flag |= FLAG_SAME_VALUE_LENGTH;
+ }
+ if (currentState.type == previousState.type) {
+ flag |= FLAG_SAME_TYPE;
+ }
+
+ int prefixTimestamp = findCommonTimestampPrefix(
+ currentState, previousState);
+ flag |= (prefixTimestamp) << SHIFT_TIMESTAMP_LENGTH;
+
+ if (ByteBufferUtils.arePartsEqual(in,
+ previousState.prevOffset + previousState.keyLength + KeyValue.ROW_OFFSET,
+ previousState.valueLength, valueOffset, valueLength)) {
+ flag |= FLAG_SAME_VALUE;
+ }
+
+ ByteBufferUtils.copyToStream(out, flag);
+ if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(out, keyLength);
+ }
+ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(out, valueLength);
+ }
+ ByteBufferUtils.putCompressedInt(out, commonPrefix);
+
+ ByteBufferUtils.skip(in, commonPrefix);
+ if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+ ByteBufferUtils.copyToStream(out, in,
+ currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
+ ByteBufferUtils.skip(in, currentState.familyLength +
+ KeyValue.FAMILY_LENGTH_SIZE);
+ ByteBufferUtils.copyToStream(out, in, currentState.qualifierLength);
+ } else {
+ int restKeyLength = keyLength - commonPrefix -
+ KeyValue.TIMESTAMP_TYPE_SIZE;
+ ByteBufferUtils.copyToStream(out, in, restKeyLength);
+ }
+ ByteBufferUtils.skip(in, prefixTimestamp);
+ ByteBufferUtils.copyToStream(out, in,
+ KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
+
+ if ((flag & FLAG_SAME_TYPE) == 0) {
+ valueOffset -= KeyValue.TYPE_SIZE;
+ valueLength += KeyValue.TYPE_SIZE;
+ }
+
+ ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
+
+ if ((flag & FLAG_SAME_VALUE) == 0 ) {
+ ByteBufferUtils.copyToStream(out, in, valueOffset, valueLength);
+ } else {
+ if ((flag & FLAG_SAME_TYPE) == 0) {
+ ByteBufferUtils.copyToStream(out, currentState.type);
+ }
+ }
+ }
+ }
+
+ private int findCommonTimestampPrefix(FastDiffCompressionState left,
+ FastDiffCompressionState right) {
+ int prefixTimestamp = 0;
+ while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
+ left.timestamp[prefixTimestamp]
+ == right.timestamp[prefixTimestamp]) {
+ prefixTimestamp++;
+ }
+ return prefixTimestamp; // has to be at most 7 bytes
+ }
+
+ private void uncompressSingleKeyValue(DataInputStream source,
+ ByteBuffer buffer, FastDiffCompressionState state)
+ throws IOException, EncoderBufferTooSmallException {
+ byte flag = source.readByte();
+ int prevKeyLength = state.keyLength;
+
+ if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ state.keyLength = ByteBufferUtils.readCompressedInt(source);
+ }
+ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+ state.valueLength = ByteBufferUtils.readCompressedInt(source);
+ }
+ int commonLength = ByteBufferUtils.readCompressedInt(source);
+
+ ByteBufferUtils.ensureSpace(buffer, state.keyLength + state.valueLength +
+ KeyValue.ROW_OFFSET);
+
+ int kvPos = buffer.position();
+
+ if (!state.isFirst()) {
+ // copy the prefix
+ int common;
+ int prevOffset;
+
+ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+ buffer.putInt(state.keyLength);
+ buffer.putInt(state.valueLength);
+ prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
+ common = commonLength;
+ } else {
+ if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
+ prevOffset = state.prevOffset;
+ common = commonLength + KeyValue.ROW_OFFSET;
+ } else {
+ buffer.putInt(state.keyLength);
+ prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
+ common = commonLength + KeyValue.KEY_LENGTH_SIZE;
+ }
+ }
+
+ ByteBufferUtils.copyFromBuffer(buffer, buffer, prevOffset, common);
+
+ // copy the rest of the key from the buffer
+ int keyRestLength;
+ if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+ // omit the family part of the key, it is always the same
+ int rowWithSizeLength;
+ int rowRestLength;
+
+ // check length of row
+ if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
+ // not yet copied, do it now
+ ByteBufferUtils.copyFromStream(source, buffer,
+ KeyValue.ROW_LENGTH_SIZE - commonLength);
+
+ rowWithSizeLength = buffer.getShort(buffer.position() -
+ KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
+ rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
+ } else {
+ // already in kvBuffer, just read it
+ rowWithSizeLength = buffer.getShort(kvPos + KeyValue.ROW_OFFSET) +
+ KeyValue.ROW_LENGTH_SIZE;
+ rowRestLength = rowWithSizeLength - commonLength;
+ }
+
+ // copy the rest of row
+ ByteBufferUtils.copyFromStream(source, buffer, rowRestLength);
+
+ // copy the column family
+ ByteBufferUtils.copyFromBuffer(buffer, buffer,
+ state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
+ + state.rowLength, state.familyLength
+ + KeyValue.FAMILY_LENGTH_SIZE);
+ state.rowLength = (short) (rowWithSizeLength -
+ KeyValue.ROW_LENGTH_SIZE);
+
+ keyRestLength = state.keyLength - rowWithSizeLength -
+ state.familyLength -
+ (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
+ } else {
+ // prevRowWithSizeLength is the same as on previous row
+ keyRestLength = state.keyLength - commonLength -
+ KeyValue.TIMESTAMP_TYPE_SIZE;
+ }
+ // copy the rest of the key, after column family == column qualifier
+ ByteBufferUtils.copyFromStream(source, buffer, keyRestLength);
+
+ // copy timestamp
+ int prefixTimestamp =
+ (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
+ ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevTimestampOffset,
+ prefixTimestamp);
+ state.prevTimestampOffset = buffer.position() - prefixTimestamp;
+ ByteBufferUtils.copyFromStream(source, buffer, KeyValue.TIMESTAMP_SIZE
+ - prefixTimestamp);
+
+ // copy the type and value
+ if ((flag & FLAG_SAME_TYPE) != 0) {
+ buffer.put(state.type);
+ if ((flag & FLAG_SAME_VALUE) != 0) {
+ ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevOffset +
+ KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
+ } else {
+ ByteBufferUtils.copyFromStream(source, buffer, state.valueLength);
+ }
+ } else {
+ if ((flag & FLAG_SAME_VALUE) != 0) {
+ ByteBufferUtils.copyFromStream(source, buffer, KeyValue.TYPE_SIZE);
+ ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevOffset +
+ KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
+ } else {
+ ByteBufferUtils.copyFromStream(source, buffer,
+ state.valueLength + KeyValue.TYPE_SIZE);
+ }
+ state.type = buffer.get(state.prevTimestampOffset +
+ KeyValue.TIMESTAMP_SIZE);
+ }
+ } else { // is first element
+ buffer.putInt(state.keyLength);
+ buffer.putInt(state.valueLength);
+
+ state.prevTimestampOffset = buffer.position() + state.keyLength -
+ KeyValue.TIMESTAMP_TYPE_SIZE;
+ ByteBufferUtils.copyFromStream(source, buffer, state.keyLength
+ + state.valueLength);
+ state.rowLength = buffer.getShort(kvPos + KeyValue.ROW_OFFSET);
+ state.familyLength = buffer.get(kvPos + KeyValue.ROW_OFFSET +
+ KeyValue.ROW_LENGTH_SIZE + state.rowLength);
+ state.type = buffer.get(state.prevTimestampOffset +
+ KeyValue.TIMESTAMP_SIZE);
+ }
+
+ state.prevOffset = kvPos;
+ }
+
+ @Override
+ public void compressKeyValues(DataOutputStream out,
+ ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+ in.rewind();
+ ByteBufferUtils.putInt(out, in.limit());
+ FastDiffCompressionState previousState = new FastDiffCompressionState();
+ FastDiffCompressionState currentState = new FastDiffCompressionState();
+ while (in.hasRemaining()) {
+ compressSingleKeyValue(previousState, currentState,
+ out, in);
+ afterEncodingKeyValue(in, out, includesMemstoreTS);
+
+ // swap previousState <-> currentState
+ FastDiffCompressionState tmp = previousState;
+ previousState = currentState;
+ currentState = tmp;
+ }
+ }
+
+ @Override
+ public ByteBuffer uncompressKeyValues(DataInputStream source,
+ int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+ throws IOException {
+ int decompressedSize = source.readInt();
+ ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+ allocHeaderLength);
+ buffer.position(allocHeaderLength);
+ FastDiffCompressionState state = new FastDiffCompressionState();
+ while (source.available() > skipLastBytes) {
+ uncompressSingleKeyValue(source, buffer, state);
+ afterDecodingKeyValue(source, buffer, includesMemstoreTS);
+ }
+
+ if (source.available() != skipLastBytes) {
+ throw new IllegalStateException("Read too much bytes.");
+ }
+
+ return buffer;
+ }
+
+ @Override
+ public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+ block.mark();
+ block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
+ int keyLength = ByteBufferUtils.readCompressedInt(block);
+ ByteBufferUtils.readCompressedInt(block); // valueLength
+ ByteBufferUtils.readCompressedInt(block); // commonLength
+ int pos = block.position();
+ block.reset();
+ return ByteBuffer.wrap(block.array(), pos, keyLength).slice();
+ }
+
+ @Override
+ public String toString() {
+ return FastDiffDeltaEncoder.class.getSimpleName();
+ }
+
+ @Override
+ public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+ final boolean includesMemstoreTS) {
+ return new BufferedEncodedSeeker(comparator) {
+ private byte[] prevTimestampAndType = new byte[
+ Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE];
+ private int rowLengthWithSize;
+ private int columnFamilyLengthWithSize;
+
+ private void decode(boolean isFirst) {
+ byte flag = currentBuffer.get();
+ if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ if (!isFirst) {
+ System.arraycopy(current.keyBuffer,
+ current.keyLength - prevTimestampAndType.length,
+ prevTimestampAndType, 0,
+ prevTimestampAndType.length);
+ }
+ current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+ }
+ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+ current.valueLength =
+ ByteBufferUtils.readCompressedInt(currentBuffer);
+ }
+ current.lastCommonPrefix =
+ ByteBufferUtils.readCompressedInt(currentBuffer);
+
+ current.ensureSpaceForKey();
+
+ if (isFirst) {
+ // copy everything
+ currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+ current.keyLength - prevTimestampAndType.length);
+ rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
+ Bytes.SIZEOF_SHORT;
+ columnFamilyLengthWithSize = current.keyBuffer[rowLengthWithSize] +
+ Bytes.SIZEOF_BYTE;
+ } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
+ // length of row is different, copy everything except family
+
+ // copy the row size
+ int oldRowLengthWithSize = rowLengthWithSize;
+ currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+ Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
+ rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
+ Bytes.SIZEOF_SHORT;
+
+ // move the column family
+ System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
+ current.keyBuffer, rowLengthWithSize,
+ columnFamilyLengthWithSize);
+
+ // copy the rest of row
+ currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
+ rowLengthWithSize - Bytes.SIZEOF_SHORT);
+
+ // copy the qualifier
+ currentBuffer.get(current.keyBuffer,
+ rowLengthWithSize + columnFamilyLengthWithSize,
+ current.keyLength - rowLengthWithSize -
+ columnFamilyLengthWithSize - prevTimestampAndType.length);
+ } else if (current.lastCommonPrefix < rowLengthWithSize) {
+ // we have to copy part of row and qualifier,
+ // but column family is in right place
+
+ // before column family (rest of row)
+ currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+ rowLengthWithSize - current.lastCommonPrefix);
+
+ // after column family (qualifier)
+ currentBuffer.get(current.keyBuffer,
+ rowLengthWithSize + columnFamilyLengthWithSize,
+ current.keyLength - rowLengthWithSize -
+ columnFamilyLengthWithSize - prevTimestampAndType.length);
+ } else {
+ // copy just the ending
+ currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+ current.keyLength - prevTimestampAndType.length -
+ current.lastCommonPrefix);
+ }
+
+ // timestamp
+ int pos = current.keyLength - prevTimestampAndType.length;
+ int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>>
+ SHIFT_TIMESTAMP_LENGTH;
+ if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ System.arraycopy(prevTimestampAndType, 0, current.keyBuffer,
+ pos, commonTimestampPrefix);
+ }
+ pos += commonTimestampPrefix;
+ currentBuffer.get(current.keyBuffer, pos,
+ Bytes.SIZEOF_LONG - commonTimestampPrefix);
+ pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
+
+ // type
+ if ((flag & FLAG_SAME_TYPE) == 0) {
+ currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
+ } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ current.keyBuffer[pos] = prevTimestampAndType[Bytes.SIZEOF_LONG];
+ }
+
+ // handle value
+ if ((flag & FLAG_SAME_VALUE) == 0) {
+ current.valueOffset = currentBuffer.position();
+ ByteBufferUtils.skip(currentBuffer, current.valueLength);
+ }
+
+ if (includesMemstoreTS) {
+ current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+ } else {
+ current.memstoreTS = 0;
+ }
+ current.nextKvOffset = currentBuffer.position();
+ }
+
+ @Override
+ protected void decodeFirst() {
+ ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+ decode(true);
+ }
+
+ @Override
+ protected void decodeNext() {
+ decode(false);
+ }
+ };
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Compress key by storing size of common prefix with previous KeyValue
+ * and storing raw size of rest.
+ *
+ * Format:
+ * 1-5 bytes: compressed key length minus prefix (7-bit encoding)
+ * 1-5 bytes: compressed value length (7-bit encoding)
+ * 1-3 bytes: compressed length of common key prefix
+ * ... bytes: rest of key (including timestamp)
+ * ... bytes: value
+ *
+ * In a worst case compressed KeyValue will be three bytes longer than original.
+ *
+ */
+public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
+
+ private int addKv(int offset, DataOutputStream out,
+ ByteBuffer in, int prevKeyLength) throws IOException {
+ int keyLength = in.getInt();
+ int valueLength = in.getInt();
+
+ if (offset == -1) {
+ // copy the key, there is no common prefix with none
+ ByteBufferUtils.putCompressedInt(out, keyLength);
+ ByteBufferUtils.putCompressedInt(out, valueLength);
+ ByteBufferUtils.putCompressedInt(out, 0);
+ ByteBufferUtils.copyToStream(out, in, keyLength + valueLength);
+ } else {
+ // find a common prefix and skip it
+ int common = ByteBufferUtils.findCommonPrefix(
+ in, offset + KeyValue.ROW_OFFSET,
+ in.position(),
+ Math.min(prevKeyLength, keyLength));
+
+ ByteBufferUtils.putCompressedInt(out, keyLength - common);
+ ByteBufferUtils.putCompressedInt(out, valueLength);
+ ByteBufferUtils.putCompressedInt(out, common);
+
+ ByteBufferUtils.skip(in, common);
+ ByteBufferUtils.copyToStream(out, in, keyLength - common + valueLength);
+ }
+
+ return keyLength;
+ }
+
+ @Override
+ public void compressKeyValues(DataOutputStream writeHere,
+ ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+ in.rewind();
+ ByteBufferUtils.putInt(writeHere, in.limit());
+ int prevOffset = -1;
+ int offset = 0;
+ int keyLength = 0;
+ while (in.hasRemaining()) {
+ offset = in.position();
+ keyLength = addKv(prevOffset, writeHere, in, keyLength);
+ afterEncodingKeyValue(in, writeHere, includesMemstoreTS);
+ prevOffset = offset;
+ }
+ }
+
+ @Override
+ public ByteBuffer uncompressKeyValues(DataInputStream source,
+ int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+ throws IOException {
+ int decompressedSize = source.readInt();
+ ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+ allocHeaderLength);
+ buffer.position(allocHeaderLength);
+ int prevKeyOffset = 0;
+
+ while (source.available() > skipLastBytes) {
+ prevKeyOffset = uncompressKeyValue(source, buffer, prevKeyOffset);
+ afterDecodingKeyValue(source, buffer, includesMemstoreTS);
+ }
+
+ if (source.available() != skipLastBytes) {
+ throw new IllegalStateException("Read too many bytes.");
+ }
+
+ buffer.limit(buffer.position());
+ return buffer;
+ }
+
+ private int uncompressKeyValue(DataInputStream source, ByteBuffer buffer,
+ int prevKeyOffset)
+ throws IOException, EncoderBufferTooSmallException {
+ int keyLength = ByteBufferUtils.readCompressedInt(source);
+ int valueLength = ByteBufferUtils.readCompressedInt(source);
+ int commonLength = ByteBufferUtils.readCompressedInt(source);
+ int keyOffset;
+ keyLength += commonLength;
+
+ ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength
+ + KeyValue.ROW_OFFSET);
+
+ buffer.putInt(keyLength);
+ buffer.putInt(valueLength);
+
+ // copy the prefix
+ if (commonLength > 0) {
+ keyOffset = buffer.position();
+ ByteBufferUtils.copyFromBuffer(buffer, buffer, prevKeyOffset,
+ commonLength);
+ } else {
+ keyOffset = buffer.position();
+ }
+
+ // copy rest of the key and value
+ int len = keyLength - commonLength + valueLength;
+ ByteBufferUtils.copyFromStream(source, buffer, len);
+ return keyOffset;
+ }
+
+ @Override
+ public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+ block.mark();
+ block.position(Bytes.SIZEOF_INT);
+ int keyLength = ByteBufferUtils.readCompressedInt(block);
+ ByteBufferUtils.readCompressedInt(block);
+ int commonLength = ByteBufferUtils.readCompressedInt(block);
+ if (commonLength != 0) {
+ throw new AssertionError("Nonzero common length in the first key in "
+ + "block: " + commonLength);
+ }
+ int pos = block.position();
+ block.reset();
+ return ByteBuffer.wrap(block.array(), pos, keyLength).slice();
+ }
+
+ @Override
+ public String toString() {
+ return PrefixKeyDeltaEncoder.class.getSimpleName();
+ }
+
+ @Override
+ public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+ final boolean includesMemstoreTS) {
+ return new BufferedEncodedSeeker(comparator) {
+ @Override
+ protected void decodeNext() {
+ current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.lastCommonPrefix =
+ ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.keyLength += current.lastCommonPrefix;
+ current.ensureSpaceForKey();
+ currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+ current.keyLength - current.lastCommonPrefix);
+ current.valueOffset = currentBuffer.position();
+ ByteBufferUtils.skip(currentBuffer, current.valueLength);
+ if (includesMemstoreTS) {
+ current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+ } else {
+ current.memstoreTS = 0;
+ }
+ current.nextKvOffset = currentBuffer.position();
+ }
+
+ @Override
+ protected void decodeFirst() {
+ ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+ decodeNext();
+ }
+ };
+ }
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java Sat Dec 24 21:20:39 2011
@@ -1,4 +1,4 @@
-/*
+ /*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.io.RawComparator;
@@ -60,6 +59,12 @@ public abstract class AbstractHFileReade
/** Filled when we read in the trailer. */
protected final Compression.Algorithm compressAlgo;
+ /**
+ * What kind of data block encoding should be used while reading, writing and
+ * handling cache.
+ */
+ protected final HFileDataBlockEncoder blockEncoder;
+
/** Last key in the file. Filled in when we read in the file info */
protected byte [] lastKey = null;
@@ -93,7 +98,8 @@ public abstract class AbstractHFileReade
protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long fileSize,
final boolean closeIStream,
- final CacheConfig cacheConf) {
+ final CacheConfig cacheConf,
+ final HFileDataBlockEncoder dataBlockEncoder) {
super(null, path);
this.trailer = trailer;
this.compressAlgo = trailer.getCompressionCodec();
@@ -101,6 +107,8 @@ public abstract class AbstractHFileReade
this.fileSize = fileSize;
this.istream = fsdis;
this.closeIStream = closeIStream;
+ this.blockEncoder = dataBlockEncoder != null
+ ? dataBlockEncoder : new NoOpDataBlockEncoder();
this.path = path;
this.name = path.getName();
}
@@ -275,8 +283,11 @@ public abstract class AbstractHFileReade
protected int blockFetches;
- public Scanner(final boolean cacheBlocks,
+ protected final HFile.Reader reader;
+
+ public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
+ this.reader = reader;
this.cacheBlocks = cacheBlocks;
this.pread = pread;
this.isCompaction = isCompaction;
@@ -296,6 +307,26 @@ public abstract class AbstractHFileReade
if (!isSeeked())
throw new NotSeekedException();
}
+
+ @Override
+ public int seekTo(byte[] key) throws IOException {
+ return seekTo(key, 0, key.length);
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key) throws IOException {
+ return seekBefore(key, 0, key.length);
+ }
+
+ @Override
+ public int reseekTo(byte[] key) throws IOException {
+ return reseekTo(key, 0, key.length);
+ }
+
+ @Override
+ public HFile.Reader getReader() {
+ return reader;
+ }
}
/** For testing */
@@ -306,5 +337,4 @@ public abstract class AbstractHFileReade
public Path getPath() {
return path;
}
-
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java Sat Dec 24 21:20:39 2011
@@ -83,6 +83,9 @@ public abstract class AbstractHFileWrite
/** The compression algorithm used. NONE if no compression. */
protected final Compression.Algorithm compressAlgo;
+
+ /** The data block encoding which will be used. NONE if there is no encoding */
+ protected final HFileDataBlockEncoder blockEncoder;
/** First key in a block. */
protected byte[] firstKeyInBlock = null;
@@ -102,7 +105,9 @@ public abstract class AbstractHFileWrite
public AbstractHFileWriter(CacheConfig cacheConf,
FSDataOutputStream outputStream, Path path, int blockSize,
- Compression.Algorithm compressAlgo, KeyComparator comparator) {
+ Compression.Algorithm compressAlgo,
+ HFileDataBlockEncoder dataBlockEncoder,
+ KeyComparator comparator) {
super(null, path);
this.outputStream = outputStream;
this.path = path;
@@ -110,6 +115,8 @@ public abstract class AbstractHFileWrite
this.blockSize = blockSize;
this.compressAlgo = compressAlgo == null
? HFile.DEFAULT_COMPRESSION_ALGORITHM : compressAlgo;
+ this.blockEncoder = dataBlockEncoder != null
+ ? dataBlockEncoder : new NoOpDataBlockEncoder();
this.comparator = comparator != null ? comparator
: Bytes.BYTES_RAWCOMPARATOR;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java Sat Dec 24 21:20:39 2011
@@ -39,6 +39,14 @@ public enum BlockType {
/** Data block, both versions */
DATA("DATABLK*", BlockCategory.DATA),
+ /** An encoded data block (e.g. with prefix compression), version 2 */
+ ENCODED_DATA("DATABLKE", BlockCategory.DATA) {
+ @Override
+ public int getId() {
+ return DATA.ordinal();
+ }
+ },
+
/** Version 2 leaf index block. Appears in the data block section */
LEAF_INDEX("IDXLEAF2", BlockCategory.INDEX),
@@ -102,6 +110,15 @@ public enum BlockType {
this.metricCat = metricCat;
assert magic.length == MAGIC_LENGTH;
}
+
+ /**
+ * Use it instead of oridinal(). It works exactly the same,
+ * except DATA and ENCODED_DATA got same id.
+ * @return id between 0 and N
+ */
+ public int getId() {
+ return ordinal();
+ }
public void writeToStream(OutputStream out) throws IOException {
out.write(magic);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Sat Dec 24 21:20:39 2011
@@ -243,6 +243,7 @@ public class HFile {
public abstract Writer createWriter(FileSystem fs, Path path,
int blockSize, Compression.Algorithm compress,
+ HFileDataBlockEncoder dataBlockEncoder,
final KeyComparator comparator) throws IOException;
public abstract Writer createWriter(FileSystem fs, Path path,
@@ -371,32 +372,47 @@ public class HFile {
}
private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
- long size, boolean closeIStream, CacheConfig cacheConf)
+ long size, boolean closeIStream, CacheConfig cacheConf,
+ HFileDataBlockEncoder dataBlockEncoder)
throws IOException {
FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size);
switch (trailer.getVersion()) {
case 1:
return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
- cacheConf);
+ cacheConf, dataBlockEncoder);
case 2:
return new HFileReaderV2(path, trailer, fsdis, size, closeIStream,
- cacheConf);
+ cacheConf, dataBlockEncoder);
default:
throw new IOException("Cannot instantiate reader for HFile version " +
trailer.getVersion());
}
}
+ public static Reader createReader(
+ FileSystem fs, Path path, CacheConfig cacheConf) throws IOException {
+ return createReader(fs, path, cacheConf,
+ new NoOpDataBlockEncoder());
+ }
+
+ public static Reader createReader(Path path, FSDataInputStream fsdis,
+ long size, CacheConfig cacheConf) throws IOException {
+ return createReader(path, fsdis, size, cacheConf,
+ new NoOpDataBlockEncoder());
+ }
+
public static Reader createReader(FileSystem fs, Path path,
- CacheConfig cacheConf) throws IOException {
+ CacheConfig cacheConf, HFileDataBlockEncoder dataBlockEncoder)
+ throws IOException {
return pickReaderVersion(path, fs.open(path),
- fs.getFileStatus(path).getLen(), true, cacheConf);
+ fs.getFileStatus(path).getLen(), true, cacheConf, dataBlockEncoder);
}
public static Reader createReader(Path path, FSDataInputStream fsdis,
- long size, CacheConfig cacheConf)
- throws IOException {
- return pickReaderVersion(path, fsdis, size, false, cacheConf);
+ long size, CacheConfig cacheConf,
+ HFileDataBlockEncoder dataBlockEncoder) throws IOException {
+ return pickReaderVersion(path, fsdis, size, false, cacheConf,
+ dataBlockEncoder);
}
/*