You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/10/23 23:58:44 UTC
svn commit: r1401499 [1/2] - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/io/encoding/
main/java/org/apache/hadoop/hbase/io/hfile/
main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hado...
Author: mbautin
Date: Tue Oct 23 21:58:43 2012
New Revision: 1401499
URL: http://svn.apache.org/viewvc?rev=1401499&view=rev
Log:
[jira] [HBASE-6597] [89-fb] Incremental Block Encoding
Author: mbautin
Summary:
Created incremental encoding class. HFileBlock.Writer now includes methods to support incremental encoding of data. Removed direct access to this class's data stream. Key/value pairs should be added using a DataBlockEncoder.Writer.
Changed enum DataEncoding.NONE to use the CopyKeyDataBlockEncoder. This avoids code duplication.
Test Plan:
Added new test file io/hfile/TestIncrementalEncoding.java.
Reviewers: kannan
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D554523
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java
- copied, changed from r1401280, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestIncrementalEncoding.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java Tue Oct 23 21:58:43 2012
@@ -2177,4 +2177,15 @@ public class KeyValue implements Writabl
out.writeInt(this.length);
out.write(this.bytes, this.offset, this.length);
}
-}
\ No newline at end of file
+
+ /**
+ * Returns the size of a key/value pair in bytes
+ * @param keyLength length of the key in bytes
+ * @param valueLength length of the value in bytes
+ * @return key/value pair size in bytes
+ */
+ public static int getKVSize(final int keyLength,
+ final int valueLength) {
+ return ROW_OFFSET + keyLength + valueLength;
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java Tue Oct 23 21:58:43 2012
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -35,12 +36,154 @@ abstract class BufferedDataBlockEncoder
private static int INITIAL_KEY_BUFFER_SIZE = 512;
+ protected static int getCommonPrefixLength(byte[] a, int aOffset, int aLength,
+ byte[] b, int bOffset, int bLength) {
+ if (a == null || b == null) {
+ return 0;
+ }
+
+ int common = 0;
+ int minLength = Math.min(aLength, bLength);
+ while (common < minLength && a[aOffset + common] == b[bOffset + common]) {
+ common++;
+ }
+ return common;
+ }
+
@Override
- public ByteBuffer decodeKeyValues(DataInputStream source,
- boolean includesMemstoreTS) throws IOException {
- return decodeKeyValues(source, 0, 0, includesMemstoreTS);
+ public void encodeKeyValues(DataOutputStream out,
+ ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+ in.rewind();
+ if (shouldWriteUnencodedLength()) {
+ out.writeInt(in.remaining());
+ }
+ BufferedEncodedWriter writer = createWriter(out, includesMemstoreTS);
+ byte[] inputBytes = in.array();
+ int inputOffset = in.arrayOffset();
+
+ while (in.hasRemaining()) {
+ int keyLength = in.getInt();
+ int valueLength = in.getInt();
+ int inputPos = in.position(); // This is the buffer position after key/value length.
+ int keyOffset = inputOffset + inputPos;
+ int valueOffset = keyOffset + keyLength;
+
+ writer.updateInitial(inputBytes, keyOffset, keyLength, inputBytes, valueOffset, valueLength);
+ in.position(inputPos + keyLength + valueLength);
+
+ long memstoreTS = 0;
+ if (includesMemstoreTS) {
+ memstoreTS = ByteBufferUtils.readVLong(in);
+ }
+ writer.finishAddingKeyValue(memstoreTS, inputBytes, keyOffset, keyLength, inputBytes,
+ valueOffset, valueLength);
+ }
+ }
+
+ abstract static class BufferedEncodedWriter<STATE extends EncodingState>
+ implements EncodedWriter {
+
+ protected int unencodedLength;
+ protected final DataOutputStream out;
+ protected final boolean includesMemstoreTS;
+
+ protected STATE currentState;
+ protected STATE prevState;
+
+ /**
+ * Starts updating the writer with a new key/value pair. Unlike {@link #update(long, byte[],
+ * int, int, byte[], int, int)}, does not take a memstore timestamp.
+ */
+ protected abstract void updateInitial(final byte[] key,
+ final int keyOffset, final int keyLength, final byte[] value,
+ final int valueOffset, final int valueLength) throws IOException;
+
+ @Override
+ public void update(final long memstoreTS, final byte[] key,
+ final int keyOffset, final int keyLength, final byte[] value,
+ final int valueOffset, final int valueLength) throws IOException {
+ updateInitial(key, keyOffset, keyLength, value, valueOffset, valueLength);
+ finishAddingKeyValue(memstoreTS, key, keyOffset, keyLength, value, valueOffset, valueLength);
+ }
+
+ protected void finishAddingKeyValue(long memstoreTS, byte[] key, int keyOffset, int keyLength,
+ byte[] value, int valueOffset, int valueLength) throws IOException {
+ if (this.includesMemstoreTS) {
+ WritableUtils.writeVLong(this.out, memstoreTS);
+ unencodedLength += WritableUtils.getVIntSize(memstoreTS);
+ }
+ this.unencodedLength += KeyValue.getKVSize(keyLength, valueLength);
+
+ // Encoder state management.
+ // state may be null for the no-op encoder.
+ if (currentState != null) {
+ if (prevState == null) {
+ // Initially prevState is null, let us initialize it.
+ prevState = currentState;
+ currentState = createState();
+ } else {
+ // Both previous and current states exist. Swap them.
+ STATE tmp = currentState;
+ currentState = prevState;
+ prevState = tmp;
+ }
+
+ prevState.key = key;
+ prevState.value = value;
+ prevState.keyOffset = keyOffset;
+ prevState.valueOffset = valueOffset;
+ prevState.keyLength = keyLength;
+ prevState.valueLength = valueLength;
+ }
+ }
+
+ /**
+ * Forces writer to move all encoded data to a single stream and for all
+ * key/value pairs to agree on including the memstore timestamp
+ */
+ public BufferedEncodedWriter(DataOutputStream out, boolean includesMemstoreTS)
+ throws IOException {
+ Preconditions.checkNotNull(out);
+ this.unencodedLength = 0;
+ this.out = out;
+ this.includesMemstoreTS = includesMemstoreTS;
+ currentState = createState();
+ if (currentState == null &&
+ (Class) getClass() != CopyKeyDataBlockEncoder.UnencodedWriter.class) {
+ throw new NullPointerException("Encoder state is null for " + getClass().getName());
+ }
+ }
+
+ /**
+ * Reserves space for necessary metadata, such as unencoded size, in the encoded block. Called
+ * before we start writing encoded data.
+ */
+ @Override
+ public void reserveMetadataSpace() throws IOException {
+ out.writeInt(0);
+ }
+
+ /**
+ * @param data a byte array containing encoded data
+ * @param offset offset of encoded data in the given array
+ * @param length length of encoded data in the given array
+ * @return true if the resulting block is stored in the "encoded block" format
+ */
+ @Override
+ public boolean finishEncoding(byte[] data, final int offset, final int length)
+ throws IOException {
+ // Add unencoded length to front of array. This only happens for encoded blocks.
+ ByteBufferUtils.putInt(data, offset, length, this.unencodedLength);
+ return true;
+ }
+
+ abstract STATE createState();
}
+ @Override
+ public abstract BufferedEncodedWriter createWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException;
+
protected static class SeekerState {
protected int valueOffset = -1;
protected int keyLength;
@@ -299,4 +442,14 @@ abstract class BufferedDataBlockEncoder
}
}
+ /** Whether unencoded data length should be stored in the beginning of an encoded block */
+ boolean shouldWriteUnencodedLength() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName();
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java Tue Oct 23 21:58:43 2012
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.io.encoding;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-
-/**
- * Stores the state of data block encoder at the beginning of new key.
- */
-class CompressionState {
- int keyLength;
- int valueLength;
-
- short rowLength;
- int prevOffset = FIRST_KEY;
- byte familyLength;
- int qualifierLength;
- byte type;
-
- private final static int FIRST_KEY = -1;
-
- boolean isFirst() {
- return prevOffset == FIRST_KEY;
- }
-
- /**
- * Analyze the key and fill the state.
- * Uses mark() and reset() in ByteBuffer.
- * @param in Buffer at the position where key starts
- * @param keyLength Length of key in bytes
- * @param valueLength Length of values in bytes
- */
- void readKey(ByteBuffer in, int keyLength, int valueLength) {
- readKey(in, keyLength, valueLength, 0, null);
- }
-
- /**
- * Analyze the key and fill the state assuming we know previous state.
- * Uses mark() and reset() in ByteBuffer to avoid moving the position.
- * <p>
- * This method overrides all the fields of this instance, except
- * {@link #prevOffset}, which is usually manipulated directly by encoders
- * and decoders.
- * @param in Buffer at the position where key starts
- * @param keyLength Length of key in bytes
- * @param valueLength Length of values in bytes
- * @param commonPrefix how many first bytes are common with previous KeyValue
- * @param previousState State from previous KeyValue
- */
- void readKey(ByteBuffer in, int keyLength, int valueLength,
- int commonPrefix, CompressionState previousState) {
- this.keyLength = keyLength;
- this.valueLength = valueLength;
-
- // fill the state
- in.mark(); // mark beginning of key
-
- if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
- rowLength = in.getShort();
- ByteBufferUtils.skip(in, rowLength);
-
- familyLength = in.get();
-
- qualifierLength = keyLength - rowLength - familyLength -
- KeyValue.KEY_INFRASTRUCTURE_SIZE;
- ByteBufferUtils.skip(in, familyLength + qualifierLength);
- } else {
- rowLength = previousState.rowLength;
- familyLength = previousState.familyLength;
- qualifierLength = previousState.qualifierLength +
- keyLength - previousState.keyLength;
- ByteBufferUtils.skip(in, (KeyValue.ROW_LENGTH_SIZE +
- KeyValue.FAMILY_LENGTH_SIZE) +
- rowLength + familyLength + qualifierLength);
- }
-
- readTimestamp(in);
-
- type = in.get();
-
- in.reset();
- }
-
- protected void readTimestamp(ByteBuffer in) {
- // used in subclasses to add timestamp to state
- ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_SIZE);
- }
-
- void copyFrom(CompressionState state) {
- keyLength = state.keyLength;
- valueLength = state.valueLength;
-
- rowLength = state.rowLength;
- prevOffset = state.prevOffset;
- familyLength = state.familyLength;
- qualifierLength = state.qualifierLength;
- type = state.type;
- }
-}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java Tue Oct 23 21:58:43 2012
@@ -26,45 +26,84 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.io.RawComparator;
/**
- * Just copy data, do not do any kind of compression. Use for comparison and
- * benchmarking.
+ * This "encoder" implementation is used for the case when encoding is turned
+ * off. Unencoded block headers are exactly the same as they were before the
+ * data block encoding feature was introduced (no special fields for unencoded
+ * size or encoding type).
*/
public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
@Override
public void encodeKeyValues(DataOutputStream out,
ByteBuffer in, boolean includesMemstoreTS) throws IOException {
in.rewind();
- ByteBufferUtils.putInt(out, in.limit());
- ByteBufferUtils.moveBufferToStream(out, in, in.limit());
+ ByteBufferUtils.moveBufferToStream(out, in, in.remaining());
}
@Override
public ByteBuffer decodeKeyValues(DataInputStream source,
- int preserveHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+ int preserveHeaderLength, boolean includesMemstoreTS, int totalEncodedSize)
throws IOException {
- int decompressedSize = source.readInt();
- ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+ // Encoded size and decoded size are the same here
+ ByteBuffer buffer = ByteBuffer.allocate(totalEncodedSize +
preserveHeaderLength);
buffer.position(preserveHeaderLength);
- ByteBufferUtils.copyFromStreamToBuffer(buffer, source, decompressedSize);
+ ByteBufferUtils.copyFromStreamToBuffer(buffer, source, totalEncodedSize);
return buffer;
}
@Override
public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
- int keyLength = block.getInt(Bytes.SIZEOF_INT);
+ int keyLength = block.getInt(0);
return ByteBuffer.wrap(block.array(),
- block.arrayOffset() + 3 * Bytes.SIZEOF_INT, keyLength).slice();
+ block.arrayOffset() + 2 * Bytes.SIZEOF_INT, keyLength).slice();
}
-
@Override
public String toString() {
return CopyKeyDataBlockEncoder.class.getSimpleName();
}
@Override
+ public UnencodedWriter createWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException {
+ return new UnencodedWriter(out, includesMemstoreTS);
+ }
+
+ static class UnencodedWriter extends BufferedEncodedWriter<EncodingState> {
+ public UnencodedWriter(DataOutputStream out, boolean includesMemstoreTS)
+ throws IOException {
+ super(out, includesMemstoreTS);
+ }
+
+ @Override
+ public void updateInitial(final byte[] key,
+ final int keyOffset, final int keyLength, final byte[] value,
+ final int valueOffset, final int valueLength) throws IOException {
+ // Write out the unencoded data
+ out.writeInt(keyLength);
+ out.writeInt(valueLength);
+ this.out.write(key, keyOffset, keyLength);
+ this.out.write(value, valueOffset, valueLength);
+ }
+
+ @Override
+ public void reserveMetadataSpace() {
+ // We are not storing unencoded block length for unencoded blocks.
+ }
+
+ @Override
+ public boolean finishEncoding(byte[] data, int offset, int length) throws IOException {
+ return false; // no encoding
+ }
+
+ @Override
+ EncodingState createState() {
+ return null;
+ }
+ }
+
+ @Override
public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
final boolean includesMemstoreTS) {
return new BufferedEncodedSeeker<SeekerState>(comparator) {
@@ -86,10 +125,15 @@ public class CopyKeyDataBlockEncoder ext
@Override
protected void decodeFirst() {
- ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
current.lastCommonPrefix = 0;
decodeNext();
}
};
}
+
+ @Override
+ boolean shouldWriteUnencodedLength() {
+ // We don't store unencoded length for unencoded blocks.
+ return false;
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java Tue Oct 23 21:58:43 2012
@@ -48,27 +48,16 @@ public interface DataBlockEncoder {
/**
* Decode.
- * @param source Encoded stream of KeyValues.
- * @param includesMemstoreTS true if including memstore timestamp after every
- * key-value pair
- * @return decoded block of KeyValues.
- * @throws IOException If there is an error in source.
- */
- public ByteBuffer decodeKeyValues(DataInputStream source,
- boolean includesMemstoreTS) throws IOException;
-
- /**
- * Decode.
* @param source encoded stream of KeyValues.
* @param allocateHeaderLength allocate this many bytes for the header.
- * @param skipLastBytes Do not copy n last bytes.
* @param includesMemstoreTS true if including memstore timestamp after every
* key-value pair
+ * @param totalEncodedSize the total size of the encoded data to read
* @return decoded block of KeyValues.
* @throws IOException If there is an error in source.
*/
public ByteBuffer decodeKeyValues(DataInputStream source,
- int allocateHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+ int allocateHeaderLength, boolean includesMemstoreTS, int totalEncodedSize)
throws IOException;
/**
@@ -92,6 +81,54 @@ public interface DataBlockEncoder {
boolean includesMemstoreTS);
/**
+ * Create an incremental writer
+ * @param out Where to write encoded data
+ * @param includesMemstoreTS True if including memstore timestamp after every
+ * key-value pair
+ * @return
+ */
+ public EncodedWriter createWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException;
+
+ /**
+ * An interface for performing incremental encoding
+ */
+ public static interface EncodedWriter {
+ /**
+ * Sets next key to insert for the writer and updates the encoded data
+ * in the stream if necessary.
+ * @param memstoreTS Current memstore timestamp
+ * @param key Source of key bytes
+ * @param keyOffset Offset of initial byte in key array
+ * @param keyLength Length of key bytes in key array
+ * @param value Source of value bytes
+ * @param valueOffset Offset of initial byte in value array
+ * @param valueLength Length of value bytes in value array
+ * @throws IOException
+ */
+ public void update(final long memstoreTS, final byte[] key,
+ final int keyOffset, final int keyLength, final byte[] value,
+ final int valueOffset, final int valueLength) throws IOException;
+
+ /**
+ * Completes the encoding process on the given byte array
+ * @param data The encoded stream as a byte array
+ * @param offset Offset of initial byte in data array
+ * @param length Length of the portion of the array containing encoded data
+ * @return True if encoding was performed
+ * @throws IOException
+ */
+ public boolean finishEncoding(byte[] data, final int offset,
+ final int length) throws IOException;
+
+ /**
+ * Called before writing anything to the stream to reserve space for the necessary metadata
+ * such as unencoded length.
+ */
+ void reserveMetadataSpace() throws IOException;
+ }
+
+ /**
* An interface which enable to seek while underlying data is encoded.
*
* It works on one HFileBlock, but it is reusable. See
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java Tue Oct 23 21:58:43 2012
@@ -34,7 +34,18 @@ import org.apache.hadoop.hbase.util.Byte
public enum DataBlockEncoding {
/** Disable data block encoding. */
- NONE(0, null),
+ NONE(0, new CopyKeyDataBlockEncoder()) {
+ @Override
+ public int inBlockMetadataSize() {
+ return 0;
+ }
+
+ @Override
+ public int encodingIdSize() {
+ return 0;
+ }
+ },
+
// id 1 is reserved for the BITSET algorithm to be added later
PREFIX(2, new PrefixKeyDeltaEncoder()),
DIFF(3, new DiffKeyDeltaEncoder()),
@@ -177,4 +188,16 @@ public enum DataBlockEncoding {
return idToEncoding.get(dataBlockEncodingId);
}
+ /**
+ * Size of metadata inside the encoded block. This metadata is not considered part of the block
+ * header, but is part of the block payload. It does not include the encoding id.
+ */
+ public int inBlockMetadataSize() {
+ return Bytes.SIZEOF_INT; // unencoded size
+ }
+
+ public int encodingIdSize() {
+ return ID_SIZE;
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java Tue Oct 23 21:58:43 2012
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
@@ -28,9 +29,9 @@ import org.apache.hadoop.io.RawComparato
/**
* Compress using:
- * - store size of common prefix
+ * - store the size of common prefix
* - save column family once, it is same within HFile
- * - use integer compression for key, value and prefix (7-bit encoding)
+ * - use integer compression for key, value and prefix lengths (7-bit encoding)
* - use bits to avoid duplication key length, value length
* and type if it same as previous
* - store in 3 bits length of timestamp field
@@ -56,7 +57,7 @@ public class DiffKeyDeltaEncoder extends
static final int SHIFT_TIMESTAMP_LENGTH = 4;
static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
- protected static class DiffCompressionState extends CompressionState {
+ protected static class DiffEncodingState extends EncodingState {
long timestamp;
byte[] familyNameWithSize;
@@ -66,141 +67,16 @@ public class DiffKeyDeltaEncoder extends
}
@Override
- void copyFrom(CompressionState state) {
+ void copyFrom(EncodingState state) {
super.copyFrom(state);
- DiffCompressionState state2 = (DiffCompressionState) state;
+ DiffEncodingState state2 = (DiffEncodingState) state;
timestamp = state2.timestamp;
}
}
- private void compressSingleKeyValue(DiffCompressionState previousState,
- DiffCompressionState currentState, DataOutputStream out,
- ByteBuffer in) throws IOException {
- byte flag = 0;
- int kvPos = in.position();
- int keyLength = in.getInt();
- int valueLength = in.getInt();
-
- long timestamp;
- long diffTimestamp = 0;
- int diffTimestampFitsInBytes = 0;
-
- int commonPrefix;
-
- int timestampFitsInBytes;
-
- if (previousState.isFirst()) {
- currentState.readKey(in, keyLength, valueLength);
- currentState.prevOffset = kvPos;
- timestamp = currentState.timestamp;
- if (timestamp < 0) {
- flag |= FLAG_TIMESTAMP_SIGN;
- timestamp = -timestamp;
- }
- timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
-
- flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
- commonPrefix = 0;
-
- // put column family
- in.mark();
- ByteBufferUtils.skip(in, currentState.rowLength
- + KeyValue.ROW_LENGTH_SIZE);
- ByteBufferUtils.moveBufferToStream(out, in, currentState.familyLength
- + KeyValue.FAMILY_LENGTH_SIZE);
- in.reset();
- } else {
- // find a common prefix and skip it
- commonPrefix =
- ByteBufferUtils.findCommonPrefix(in, in.position(),
- previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength
- - KeyValue.TIMESTAMP_TYPE_SIZE);
- // don't compress timestamp and type using prefix
-
- currentState.readKey(in, keyLength, valueLength,
- commonPrefix, previousState);
- currentState.prevOffset = kvPos;
- timestamp = currentState.timestamp;
- boolean negativeTimestamp = timestamp < 0;
- if (negativeTimestamp) {
- timestamp = -timestamp;
- }
- timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
-
- if (keyLength == previousState.keyLength) {
- flag |= FLAG_SAME_KEY_LENGTH;
- }
- if (valueLength == previousState.valueLength) {
- flag |= FLAG_SAME_VALUE_LENGTH;
- }
- if (currentState.type == previousState.type) {
- flag |= FLAG_SAME_TYPE;
- }
-
- // encode timestamp
- diffTimestamp = previousState.timestamp - currentState.timestamp;
- boolean minusDiffTimestamp = diffTimestamp < 0;
- if (minusDiffTimestamp) {
- diffTimestamp = -diffTimestamp;
- }
- diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
- if (diffTimestampFitsInBytes < timestampFitsInBytes) {
- flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
- flag |= FLAG_TIMESTAMP_IS_DIFF;
- if (minusDiffTimestamp) {
- flag |= FLAG_TIMESTAMP_SIGN;
- }
- } else {
- flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
- if (negativeTimestamp) {
- flag |= FLAG_TIMESTAMP_SIGN;
- }
- }
- }
-
- out.write(flag);
-
- if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
- ByteBufferUtils.putCompressedInt(out, keyLength);
- }
- if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
- ByteBufferUtils.putCompressedInt(out, valueLength);
- }
-
- ByteBufferUtils.putCompressedInt(out, commonPrefix);
- ByteBufferUtils.skip(in, commonPrefix);
-
- if (previousState.isFirst() ||
- commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
- int restRowLength =
- currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
- ByteBufferUtils.moveBufferToStream(out, in, restRowLength);
- ByteBufferUtils.skip(in, currentState.familyLength +
- KeyValue.FAMILY_LENGTH_SIZE);
- ByteBufferUtils.moveBufferToStream(out, in, currentState.qualifierLength);
- } else {
- ByteBufferUtils.moveBufferToStream(out, in,
- keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
- }
-
- if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
- ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
- } else {
- ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
- }
-
- if ((flag & FLAG_SAME_TYPE) == 0) {
- out.write(currentState.type);
- }
- ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE);
-
- ByteBufferUtils.moveBufferToStream(out, in, valueLength);
- }
-
private void uncompressSingleKeyValue(DataInputStream source,
- ByteBuffer buffer,
- DiffCompressionState state)
- throws IOException, EncoderBufferTooSmallException {
+ ByteBuffer buffer, DiffEncodingState state)
+ throws IOException, EncoderBufferTooSmallException {
// read the column family at the beginning
if (state.isFirst()) {
state.familyLength = source.readByte();
@@ -229,16 +105,14 @@ public class DiffKeyDeltaEncoder extends
}
int commonPrefix = ByteBufferUtils.readCompressedInt(source);
- // create KeyValue buffer and fill it prefix
int keyOffset = buffer.position();
- ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength
- + KeyValue.ROW_OFFSET);
+ ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
buffer.putInt(keyLength);
buffer.putInt(valueLength);
// copy common from previous key
if (commonPrefix > 0) {
- ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
+ ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.keyOffset
+ KeyValue.ROW_OFFSET, commonPrefix);
}
@@ -305,49 +179,31 @@ public class DiffKeyDeltaEncoder extends
// copy value part
ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
- state.keyLength = keyLength;
- state.valueLength = valueLength;
- state.prevOffset = keyOffset;
+ state.keyOffset = keyOffset;
state.timestamp = timestamp;
state.type = type;
- // state.qualifier is unused
- }
-
- @Override
- public void encodeKeyValues(DataOutputStream out,
- ByteBuffer in, boolean includesMemstoreTS) throws IOException {
- in.rewind();
- ByteBufferUtils.putInt(out, in.limit());
- DiffCompressionState previousState = new DiffCompressionState();
- DiffCompressionState currentState = new DiffCompressionState();
- while (in.hasRemaining()) {
- compressSingleKeyValue(previousState, currentState,
- out, in);
- afterEncodingKeyValue(in, out, includesMemstoreTS);
-
- // swap previousState <-> currentState
- DiffCompressionState tmp = previousState;
- previousState = currentState;
- currentState = tmp;
- }
+ state.keyLength = keyLength;
+ state.valueLength = valueLength;
}
@Override
public ByteBuffer decodeKeyValues(DataInputStream source,
- int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+ int allocHeaderLength, boolean includesMemstoreTS, int totalEncodedSize)
throws IOException {
+ int skipLastBytes = source.available() - totalEncodedSize;
+ Preconditions.checkState(skipLastBytes >= 0, "Requested to skip a negative number of bytes");
int decompressedSize = source.readInt();
ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
allocHeaderLength);
buffer.position(allocHeaderLength);
- DiffCompressionState state = new DiffCompressionState();
+ DiffEncodingState state = new DiffEncodingState();
while (source.available() > skipLastBytes) {
uncompressSingleKeyValue(source, buffer, state);
afterDecodingKeyValue(source, buffer, includesMemstoreTS);
}
if (source.available() != skipLastBytes) {
- throw new IllegalStateException("Read too much bytes.");
+ throw new IOException("Read too many bytes");
}
return buffer;
@@ -402,8 +258,143 @@ public class DiffKeyDeltaEncoder extends
}
@Override
- public String toString() {
- return DiffKeyDeltaEncoder.class.getSimpleName();
+ public DiffKeyDeltaEncoderWriter createWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException {
+ return new DiffKeyDeltaEncoderWriter(out, includesMemstoreTS);
+ }
+
+ /**
+ * A writer that incrementally performs Fast Diff Delta Encoding
+ */
+ private static class DiffKeyDeltaEncoderWriter
+ extends BufferedEncodedWriter<DiffEncodingState> {
+
+ public DiffKeyDeltaEncoderWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException {
+ super(out, includesMemstoreTS);
+ }
+
+ @Override
+ DiffEncodingState createState() {
+ return new DiffEncodingState();
+ }
+
+ @Override
+ public void updateInitial(final byte[] key,
+ final int keyOffset, final int keyLength, final byte[] value,
+ final int valueOffset, final int valueLength) throws IOException {
+ ByteBuffer keyBuffer = ByteBuffer.wrap(key, keyOffset, keyLength);
+ long timestamp;
+ long diffTimestamp = 0;
+ int diffTimestampFitsInBytes = 0;
+ byte flag = 0;
+ int commonPrefix;
+ int timestampFitsInBytes;
+
+ if (this.prevState == null) {
+ currentState.readKey(keyBuffer, keyLength, valueLength);
+ timestamp = currentState.timestamp;
+ if (timestamp < 0) {
+ flag |= FLAG_TIMESTAMP_SIGN;
+ timestamp = -timestamp;
+ }
+ timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+
+ flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+ commonPrefix = 0;
+
+ // put column family
+ this.out.write(key, keyOffset + currentState.rowLength
+ + KeyValue.ROW_LENGTH_SIZE, currentState.familyLength
+ + KeyValue.FAMILY_LENGTH_SIZE);
+ } else {
+ // find a common prefix and skip it
+ // don't compress timestamp and type using this prefix
+ commonPrefix = getCommonPrefixLength(key, keyOffset, keyLength -
+ KeyValue.TIMESTAMP_TYPE_SIZE, this.prevState.key,
+ this.prevState.keyOffset, this.prevState.keyLength -
+ KeyValue.TIMESTAMP_TYPE_SIZE);
+
+ currentState.readKey(keyBuffer, keyLength, valueLength,
+ commonPrefix, this.prevState);
+ timestamp = currentState.timestamp;
+ boolean negativeTimestamp = timestamp < 0;
+ if (negativeTimestamp) {
+ timestamp = -timestamp;
+ }
+ timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+
+ if (keyLength == this.prevState.keyLength) {
+ flag |= FLAG_SAME_KEY_LENGTH;
+ }
+ if (valueLength == this.prevState.valueLength) {
+ flag |= FLAG_SAME_VALUE_LENGTH;
+ }
+ if (currentState.type == this.prevState.type) {
+ flag |= FLAG_SAME_TYPE;
+ }
+
+ // encode timestamp
+ diffTimestamp = this.prevState.timestamp - currentState.timestamp;
+ boolean negativeDiffTimestamp = diffTimestamp < 0;
+ if (negativeDiffTimestamp) {
+ diffTimestamp = -diffTimestamp;
+ }
+ diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
+ if (diffTimestampFitsInBytes < timestampFitsInBytes) {
+ flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+ flag |= FLAG_TIMESTAMP_IS_DIFF;
+ if (negativeDiffTimestamp) {
+ flag |= FLAG_TIMESTAMP_SIGN;
+ }
+ } else {
+ flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+ if (negativeTimestamp) {
+ flag |= FLAG_TIMESTAMP_SIGN;
+ }
+ }
+ }
+
+ this.out.write(flag);
+
+ if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(this.out, keyLength);
+ }
+ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(this.out, valueLength);
+ }
+
+ ByteBufferUtils.putCompressedInt(this.out, commonPrefix);
+
+ if ((this.prevState == null) ||
+ commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+ int restRowLength =
+ currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
+ this.out.write(key, keyOffset + commonPrefix, restRowLength);
+ this.out.write(key, keyOffset + commonPrefix + restRowLength +
+ currentState.familyLength + KeyValue.FAMILY_LENGTH_SIZE,
+ currentState.qualifierLength);
+ } else {
+ this.out.write(key, keyOffset + commonPrefix, keyLength -
+ commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
+ }
+
+ if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
+ ByteBufferUtils.putLong(this.out, timestamp, timestampFitsInBytes);
+ } else {
+ ByteBufferUtils.putLong(this.out, diffTimestamp,
+ diffTimestampFitsInBytes);
+ }
+
+ if ((flag & FLAG_SAME_TYPE) == 0) {
+ this.out.write(currentState.type);
+ }
+
+ this.out.write(value, valueOffset, valueLength);
+
+ this.currentState.key = key;
+ this.currentState.keyOffset = keyOffset;
+ }
}
protected static class DiffSeekerState extends SeekerState {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java Tue Oct 23 21:58:43 2012
@@ -88,7 +88,7 @@ public class EncodedDataBlock {
if (decompressedData == null) {
try {
decompressedData = dataBlockEncoder.decodeKeyValues(
- dis, includesMemstoreTS);
+ dis, 0, includesMemstoreTS, dis.available());
} catch (IOException e) {
throw new RuntimeException("Problem with data block encoder, " +
"most likely it requested more bytes than are available.", e);
Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java (from r1401280, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java&r1=1401280&r2=1401499&rev=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java Tue Oct 23 21:58:43 2012
@@ -24,12 +24,16 @@ import org.apache.hadoop.hbase.util.Byte
/**
* Stores the state of data block encoder at the beginning of new key.
*/
-class CompressionState {
+class EncodingState {
+ byte[] key;
+ byte[] value;
+
int keyLength;
int valueLength;
+ int valueOffset = 0;
short rowLength;
- int prevOffset = FIRST_KEY;
+ int keyOffset = FIRST_KEY;
byte familyLength;
int qualifierLength;
byte type;
@@ -37,7 +41,7 @@ class CompressionState {
private final static int FIRST_KEY = -1;
boolean isFirst() {
- return prevOffset == FIRST_KEY;
+ return keyOffset == FIRST_KEY;
}
/**
@@ -51,12 +55,12 @@ class CompressionState {
readKey(in, keyLength, valueLength, 0, null);
}
- /**
+ /**
* Analyze the key and fill the state assuming we know previous state.
* Uses mark() and reset() in ByteBuffer to avoid moving the position.
* <p>
* This method overrides all the fields of this instance, except
- * {@link #prevOffset}, which is usually manipulated directly by encoders
+ * {@link #keyOffset}, which is usually manipulated directly by encoders
* and decoders.
* @param in Buffer at the position where key starts
* @param keyLength Length of key in bytes
@@ -65,7 +69,7 @@ class CompressionState {
* @param previousState State from previous KeyValue
*/
void readKey(ByteBuffer in, int keyLength, int valueLength,
- int commonPrefix, CompressionState previousState) {
+ int commonPrefix, EncodingState previousState) {
this.keyLength = keyLength;
this.valueLength = valueLength;
@@ -103,12 +107,12 @@ class CompressionState {
ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_SIZE);
}
- void copyFrom(CompressionState state) {
+ void copyFrom(EncodingState state) {
keyLength = state.keyLength;
valueLength = state.valueLength;
rowLength = state.rowLength;
- prevOffset = state.prevOffset;
+ keyOffset = state.keyOffset;
familyLength = state.familyLength;
qualifierLength = state.qualifierLength;
type = state.type;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java Tue Oct 23 21:58:43 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.encod
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.KeyValue;
@@ -33,7 +32,7 @@ import org.apache.hadoop.io.RawComparato
* Compress using:
* - store size of common prefix
* - save column family once in the first KeyValue
- * - use integer compression for key, value and prefix (7-bit encoding)
+ * - use integer compression for key, value and prefix lengths (7-bit encoding)
* - use bits to avoid duplication key length, value length
* and type if it same as previous
* - store in 3 bits length of prefix timestamp
@@ -53,14 +52,14 @@ import org.apache.hadoop.io.RawComparato
*
*/
public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
- final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
- final int SHIFT_TIMESTAMP_LENGTH = 0;
- final int FLAG_SAME_KEY_LENGTH = 1 << 3;
- final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
- final int FLAG_SAME_TYPE = 1 << 5;
- final int FLAG_SAME_VALUE = 1 << 6;
+ final static int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
+ final static int SHIFT_TIMESTAMP_LENGTH = 0;
+ final static int FLAG_SAME_KEY_LENGTH = 1 << 3;
+ final static int FLAG_SAME_VALUE_LENGTH = 1 << 4;
+ final static int FLAG_SAME_TYPE = 1 << 5;
+ final static int FLAG_SAME_VALUE = 1 << 6;
- private static class FastDiffCompressionState extends CompressionState {
+ private static class FastDiffEncodingState extends EncodingState {
byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
int prevTimestampOffset;
@@ -70,9 +69,9 @@ public class FastDiffDeltaEncoder extend
}
@Override
- void copyFrom(CompressionState state) {
+ void copyFrom(EncodingState state) {
super.copyFrom(state);
- FastDiffCompressionState state2 = (FastDiffCompressionState) state;
+ FastDiffEncodingState state2 = (FastDiffEncodingState) state;
System.arraycopy(state2.timestamp, 0, timestamp, 0,
KeyValue.TIMESTAMP_SIZE);
prevTimestampOffset = state2.prevTimestampOffset;
@@ -82,7 +81,7 @@ public class FastDiffDeltaEncoder extend
* Copies the first key/value from the given stream, and initializes
* decompression state based on it. Assumes that we have already read key
* and value lengths. Does not set {@link #qualifierLength} (not used by
- * decompression) or {@link #prevOffset} (set by the calle afterwards).
+ * decompression) or {@link #keyOffset} (set by the caller afterwards).
*/
private void decompressFirstKV(ByteBuffer out, DataInputStream in)
throws IOException {
@@ -100,111 +99,8 @@ public class FastDiffDeltaEncoder extend
}
- private void compressSingleKeyValue(
- FastDiffCompressionState previousState,
- FastDiffCompressionState currentState,
- OutputStream out, ByteBuffer in) throws IOException {
- currentState.prevOffset = in.position();
- int keyLength = in.getInt();
- int valueOffset =
- currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
- int valueLength = in.getInt();
- byte flag = 0;
-
- if (previousState.isFirst()) {
- // copy the key, there is no common prefix with none
- out.write(flag);
- ByteBufferUtils.putCompressedInt(out, keyLength);
- ByteBufferUtils.putCompressedInt(out, valueLength);
- ByteBufferUtils.putCompressedInt(out, 0);
-
- currentState.readKey(in, keyLength, valueLength);
-
- ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
- } else {
- // find a common prefix and skip it
- int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
- previousState.prevOffset + KeyValue.ROW_OFFSET,
- Math.min(keyLength, previousState.keyLength) -
- KeyValue.TIMESTAMP_TYPE_SIZE);
-
- currentState.readKey(in, keyLength, valueLength,
- commonPrefix, previousState);
-
- if (keyLength == previousState.keyLength) {
- flag |= FLAG_SAME_KEY_LENGTH;
- }
- if (valueLength == previousState.valueLength) {
- flag |= FLAG_SAME_VALUE_LENGTH;
- }
- if (currentState.type == previousState.type) {
- flag |= FLAG_SAME_TYPE;
- }
-
- int commonTimestampPrefix = findCommonTimestampPrefix(
- currentState, previousState);
- flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
-
- // Check if current and previous values are the same. Compare value
- // length first as an optimization.
- if (valueLength == previousState.valueLength) {
- int previousValueOffset = previousState.prevOffset
- + previousState.keyLength + KeyValue.ROW_OFFSET;
- if (ByteBufferUtils.arePartsEqual(in,
- previousValueOffset, previousState.valueLength,
- valueOffset, valueLength)) {
- flag |= FLAG_SAME_VALUE;
- }
- }
-
- out.write(flag);
- if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
- ByteBufferUtils.putCompressedInt(out, keyLength);
- }
- if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
- ByteBufferUtils.putCompressedInt(out, valueLength);
- }
- ByteBufferUtils.putCompressedInt(out, commonPrefix);
-
- ByteBufferUtils.skip(in, commonPrefix);
- if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
- // Previous and current rows are different. Copy the differing part of
- // the row, skip the column family, and copy the qualifier.
- ByteBufferUtils.moveBufferToStream(out, in,
- currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
- ByteBufferUtils.skip(in, currentState.familyLength +
- KeyValue.FAMILY_LENGTH_SIZE);
- ByteBufferUtils.moveBufferToStream(out, in,
- currentState.qualifierLength);
- } else {
- // The common part includes the whole row. As the column family is the
- // same across the whole file, it will automatically be included in the
- // common prefix, so we need not special-case it here.
- int restKeyLength = keyLength - commonPrefix -
- KeyValue.TIMESTAMP_TYPE_SIZE;
- ByteBufferUtils.moveBufferToStream(out, in, restKeyLength);
- }
- ByteBufferUtils.skip(in, commonTimestampPrefix);
- ByteBufferUtils.moveBufferToStream(out, in,
- KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
-
- // Write the type if it is not the same as before.
- if ((flag & FLAG_SAME_TYPE) == 0) {
- out.write(currentState.type);
- }
-
- // Write the value if it is not the same as before.
- if ((flag & FLAG_SAME_VALUE) == 0) {
- ByteBufferUtils.copyBufferToStream(out, in, valueOffset, valueLength);
- }
-
- // Skip key type and value in the input buffer.
- ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
- }
- }
-
- private int findCommonTimestampPrefix(FastDiffCompressionState left,
- FastDiffCompressionState right) {
+ private static int findCommonTimestampPrefix(FastDiffEncodingState left,
+ FastDiffEncodingState right) {
int prefixTimestamp = 0;
while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
left.timestamp[prefixTimestamp]
@@ -215,7 +111,7 @@ public class FastDiffDeltaEncoder extend
}
private void uncompressSingleKeyValue(DataInputStream source,
- ByteBuffer out, FastDiffCompressionState state)
+ ByteBuffer out, FastDiffEncodingState state)
throws IOException, EncoderBufferTooSmallException {
byte flag = source.readByte();
int prevKeyLength = state.keyLength;
@@ -241,15 +137,15 @@ public class FastDiffDeltaEncoder extend
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
out.putInt(state.keyLength);
out.putInt(state.valueLength);
- prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
+ prevOffset = state.keyOffset + KeyValue.ROW_OFFSET;
common = commonLength;
} else {
if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
- prevOffset = state.prevOffset;
+ prevOffset = state.keyOffset;
common = commonLength + KeyValue.ROW_OFFSET;
} else {
out.putInt(state.keyLength);
- prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
+ prevOffset = state.keyOffset + KeyValue.KEY_LENGTH_SIZE;
common = commonLength + KeyValue.KEY_LENGTH_SIZE;
}
}
@@ -284,7 +180,7 @@ public class FastDiffDeltaEncoder extend
// copy the column family
ByteBufferUtils.copyFromBufferToBuffer(out, out,
- state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
+ state.keyOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
+ state.rowLength, state.familyLength
+ KeyValue.FAMILY_LENGTH_SIZE);
state.rowLength = (short) (rowWithSizeLength -
@@ -314,7 +210,7 @@ public class FastDiffDeltaEncoder extend
if ((flag & FLAG_SAME_TYPE) != 0) {
out.put(state.type);
if ((flag & FLAG_SAME_VALUE) != 0) {
- ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
+ ByteBufferUtils.copyFromBufferToBuffer(out, out, state.keyOffset +
KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
} else {
ByteBufferUtils.copyFromStreamToBuffer(out, source,
@@ -324,7 +220,7 @@ public class FastDiffDeltaEncoder extend
if ((flag & FLAG_SAME_VALUE) != 0) {
ByteBufferUtils.copyFromStreamToBuffer(out, source,
KeyValue.TYPE_SIZE);
- ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
+ ByteBufferUtils.copyFromBufferToBuffer(out, out, state.keyOffset +
KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
} else {
ByteBufferUtils.copyFromStreamToBuffer(out, source,
@@ -337,44 +233,25 @@ public class FastDiffDeltaEncoder extend
state.decompressFirstKV(out, source);
}
- state.prevOffset = kvPos;
- }
-
- @Override
- public void encodeKeyValues(DataOutputStream out,
- ByteBuffer in, boolean includesMemstoreTS) throws IOException {
- in.rewind();
- ByteBufferUtils.putInt(out, in.limit());
- FastDiffCompressionState previousState = new FastDiffCompressionState();
- FastDiffCompressionState currentState = new FastDiffCompressionState();
- while (in.hasRemaining()) {
- compressSingleKeyValue(previousState, currentState,
- out, in);
- afterEncodingKeyValue(in, out, includesMemstoreTS);
-
- // swap previousState <-> currentState
- FastDiffCompressionState tmp = previousState;
- previousState = currentState;
- currentState = tmp;
- }
+ state.keyOffset = kvPos;
}
@Override
public ByteBuffer decodeKeyValues(DataInputStream source,
- int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+ int allocHeaderLength, boolean includesMemstoreTS, int totalEncodedSize)
throws IOException {
+ int skipLastBytes = source.available() - totalEncodedSize;
int decompressedSize = source.readInt();
- ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
- allocHeaderLength);
+ ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocHeaderLength);
buffer.position(allocHeaderLength);
- FastDiffCompressionState state = new FastDiffCompressionState();
+ FastDiffEncodingState state = new FastDiffEncodingState();
while (source.available() > skipLastBytes) {
uncompressSingleKeyValue(source, buffer, state);
afterDecodingKeyValue(source, buffer, includesMemstoreTS);
}
if (source.available() != skipLastBytes) {
- throw new IllegalStateException("Read too much bytes.");
+ throw new IOException("Read too many bytes");
}
return buffer;
@@ -393,8 +270,116 @@ public class FastDiffDeltaEncoder extend
}
@Override
- public String toString() {
- return FastDiffDeltaEncoder.class.getSimpleName();
+ public FastDiffDeltaEncoderWriter createWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException {
+ return new FastDiffDeltaEncoderWriter(out, includesMemstoreTS);
+ }
+
+ /**
+ * A writer that incrementally performs Fast Diff Delta Encoding
+ */
+ private static class FastDiffDeltaEncoderWriter
+ extends BufferedEncodedWriter<FastDiffEncodingState> {
+ public FastDiffDeltaEncoderWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException {
+ super(out, includesMemstoreTS);
+ }
+
+ @Override
+ FastDiffEncodingState createState() {
+ return new FastDiffEncodingState();
+ }
+
+ @Override
+ protected void updateInitial(byte[] key, int keyOffset, int keyLength,
+ byte[] value, int valueOffset, int valueLength) throws IOException {
+ byte flag = 0;
+ ByteBuffer keyBuffer = ByteBuffer.wrap(key, keyOffset, keyLength);
+
+ if (this.prevState == null) {
+ // The first element in the stream
+ out.write(flag);
+ ByteBufferUtils.putCompressedInt(out, keyLength);
+ ByteBufferUtils.putCompressedInt(out, valueLength);
+ ByteBufferUtils.putCompressedInt(out, 0);
+ out.write(key, keyOffset, keyLength);
+ out.write(value, valueOffset, valueLength);
+
+ // Initialize the compression state
+ currentState.readKey(keyBuffer, keyLength, valueLength);
+ } else {
+ // Find the common prefix to skip
+ int commonPrefix = getCommonPrefixLength(key, keyOffset, keyLength -
+ KeyValue.TIMESTAMP_TYPE_SIZE, this.prevState.key,
+ this.prevState.keyOffset, this.prevState.keyLength -
+ KeyValue.TIMESTAMP_TYPE_SIZE);
+
+ currentState.readKey(keyBuffer, keyLength, valueLength, commonPrefix,
+ this.prevState);
+
+ if (keyLength == this.prevState.keyLength) {
+ flag |= FLAG_SAME_KEY_LENGTH;
+ }
+ if (valueLength == this.prevState.valueLength) {
+ flag |= FLAG_SAME_VALUE_LENGTH;
+ }
+ if (currentState.type == this.prevState.type) {
+ flag |= FLAG_SAME_TYPE;
+ }
+
+ int commonTimestampPrefix = findCommonTimestampPrefix(currentState, this.prevState);
+ flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
+
+ // Check if current and previous values are the same. Compare value
+ // length first as an optimization.
+ if (valueLength == this.prevState.valueLength) {
+ if (valueLength == getCommonPrefixLength(value, valueOffset, valueLength,
+ this.prevState.value, this.prevState.valueLength,
+ this.prevState.valueLength)) {
+ // if common prefix consists of whole value length
+ flag |= FLAG_SAME_VALUE;
+ }
+ }
+
+ out.write(flag);
+ if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(out, keyLength);
+ }
+ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(out, valueLength);
+ }
+ ByteBufferUtils.putCompressedInt(out, commonPrefix);
+
+ if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+ // Previous and current rows are different. Copy the differing part
+ // of the row, skip the column family, and copy the qualifier.
+ out.write(key, keyOffset + commonPrefix, currentState.rowLength +
+ KeyValue.ROW_LENGTH_SIZE - commonPrefix);
+ out.write(key, keyOffset + currentState.familyLength +
+ KeyValue.FAMILY_LENGTH_SIZE + currentState.rowLength +
+ KeyValue.ROW_LENGTH_SIZE, currentState.qualifierLength);
+ } else {
+ // The common part includes the whole row. As the column family is
+ // the same across the whole file, it will automatically be included
+ // in the common prefix, so we need not special-case it here.
+ out.write(key, keyOffset + commonPrefix, keyLength - commonPrefix -
+ KeyValue.TIMESTAMP_TYPE_SIZE);
+ }
+ out.write(key, keyOffset + keyLength - (KeyValue.TIMESTAMP_TYPE_SIZE -
+ commonTimestampPrefix), KeyValue.TIMESTAMP_SIZE -
+ commonTimestampPrefix);
+
+ // Write the type if it is not the same as before.
+ if ((flag & FLAG_SAME_TYPE) == 0) {
+ out.write(currentState.type);
+ }
+
+ // Write the value if it is not the same as before.
+ if ((flag & FLAG_SAME_VALUE) == 0) {
+ out.write(value, valueOffset, valueLength);
+ }
+ }
+ }
}
protected static class FastDiffSeekerState extends SeekerState {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java Tue Oct 23 21:58:43 2012
@@ -16,14 +16,10 @@
*/
package org.apache.hadoop.hbase.io.encoding;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.FilterOutputStream;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -46,59 +42,12 @@ import org.apache.hadoop.io.RawComparato
*/
public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
- private int addKV(int prevKeyOffset, DataOutputStream out,
- ByteBuffer in, int prevKeyLength) throws IOException {
- int keyLength = in.getInt();
- int valueLength = in.getInt();
-
- if (prevKeyOffset == -1) {
- // copy the key, there is no common prefix with none
- ByteBufferUtils.putCompressedInt(out, keyLength);
- ByteBufferUtils.putCompressedInt(out, valueLength);
- ByteBufferUtils.putCompressedInt(out, 0);
- ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
- } else {
- // find a common prefix and skip it
- int common = ByteBufferUtils.findCommonPrefix(
- in, prevKeyOffset + KeyValue.ROW_OFFSET,
- in.position(),
- Math.min(prevKeyLength, keyLength));
-
- ByteBufferUtils.putCompressedInt(out, keyLength - common);
- ByteBufferUtils.putCompressedInt(out, valueLength);
- ByteBufferUtils.putCompressedInt(out, common);
-
- ByteBufferUtils.skip(in, common);
- ByteBufferUtils.moveBufferToStream(out, in, keyLength - common
- + valueLength);
- }
-
- return keyLength;
- }
-
- @Override
- public void encodeKeyValues(DataOutputStream writeHere,
- ByteBuffer in, boolean includesMemstoreTS) throws IOException {
- in.rewind();
- ByteBufferUtils.putInt(writeHere, in.limit());
- int prevOffset = -1;
- int offset = 0;
- int keyLength = 0;
- while (in.hasRemaining()) {
- offset = in.position();
- keyLength = addKV(prevOffset, writeHere, in, keyLength);
- afterEncodingKeyValue(in, writeHere, includesMemstoreTS);
- prevOffset = offset;
- }
- }
-
@Override
- public ByteBuffer decodeKeyValues(DataInputStream source,
- int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
- throws IOException {
+ public ByteBuffer decodeKeyValues(DataInputStream source, int allocHeaderLength,
+ boolean includesMemstoreTS, int totalEncodedSize) throws IOException {
+ int skipLastBytes = source.available() - totalEncodedSize;
int decompressedSize = source.readInt();
- ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
- allocHeaderLength);
+ ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocHeaderLength);
buffer.position(allocHeaderLength);
int prevKeyOffset = 0;
@@ -108,16 +57,15 @@ public class PrefixKeyDeltaEncoder exten
}
if (source.available() != skipLastBytes) {
- throw new IllegalStateException("Read too many bytes.");
+ throw new IOException("Read too many bytes");
}
buffer.limit(buffer.position());
return buffer;
}
- private int decodeKeyValue(DataInputStream source, ByteBuffer buffer,
- int prevKeyOffset)
- throws IOException, EncoderBufferTooSmallException {
+ private int decodeKeyValue(DataInputStream source, ByteBuffer buffer, int prevKeyOffset)
+ throws IOException, EncoderBufferTooSmallException {
int keyLength = ByteBufferUtils.readCompressedInt(source);
int valueLength = ByteBufferUtils.readCompressedInt(source);
int commonLength = ByteBufferUtils.readCompressedInt(source);
@@ -162,8 +110,38 @@ public class PrefixKeyDeltaEncoder exten
}
@Override
- public String toString() {
- return PrefixKeyDeltaEncoder.class.getSimpleName();
+ public PrefixKeyDeltaEncoderWriter createWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException {
+ return new PrefixKeyDeltaEncoderWriter(out, includesMemstoreTS);
+ }
+
+ /**
+ * A writer that incrementally performs Prefix Key Delta Encoding
+ */
+ private static class PrefixKeyDeltaEncoderWriter extends BufferedEncodedWriter<EncodingState> {
+ public PrefixKeyDeltaEncoderWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException {
+ super(out, includesMemstoreTS);
+ }
+
+ @Override
+ EncodingState createState() {
+ return new EncodingState();
+ }
+
+ @Override
+ protected void updateInitial(byte[] key, int keyOffset, int keyLength, byte[] value,
+ int valueOffset, int valueLength) throws IOException {
+ int common = prevState == null ? 0 : getCommonPrefixLength(key, keyOffset, keyLength,
+ this.prevState.key, this.prevState.keyOffset, this.prevState.keyLength);
+
+ ByteBufferUtils.putCompressedInt(this.out, keyLength - common);
+ ByteBufferUtils.putCompressedInt(this.out, valueLength);
+ ByteBufferUtils.putCompressedInt(this.out, common);
+
+ this.out.write(key, keyOffset + common, keyLength - common);
+ this.out.write(value, valueOffset, valueLength);
+ }
}
@Override
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Tue Oct 23 21:58:43 2012
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
@@ -120,7 +120,7 @@ public class HFileBlock extends SchemaCo
/**
* The on-disk size of the next block, including the header, obtained by
- * peeking into the first {@link HEADER_SIZE} bytes of the next block's
+ * peeking into the first {@link #HEADER_SIZE} bytes of the next block's
* header, or -1 if unknown.
*/
private int nextBlockOnDiskSizeWithHeader = -1;
@@ -227,7 +227,7 @@ public class HFileBlock extends SchemaCo
}
/**
- * Writes header fields into the first {@link HEADER_SIZE} bytes of the
+ * Writes header fields into the first {@link #HEADER_SIZE} bytes of the
* buffer. Resets the buffer position to the end of header as side effect.
*/
private void overwriteHeader() {
@@ -479,7 +479,7 @@ public class HFileBlock extends SchemaCo
* <li>Call {@link Writer#startWriting(BlockType)} and get a data stream to
* write to
* <li>Write your data into the stream
- * <li>Call {@link Writer#writeHeaderAndData()} as many times as you need to
+ * <li>Call {@link Writer#writeHeaderAndData} as many times as you need to
* store the serialized block into an external stream, or call
* {@link Writer#getHeaderAndData()} to get it as a byte array.
* <li>Repeat to write more blocks
@@ -503,6 +503,8 @@ public class HFileBlock extends SchemaCo
/** Data block encoder used for data blocks */
private final HFileDataBlockEncoder dataBlockEncoder;
+ private DataBlockEncoder.EncodedWriter encodedWriter;
+
/**
* The stream we use to accumulate data in uncompressed format for each
* block. We reset this stream at the end of each block and reuse it. The
@@ -559,15 +561,26 @@ public class HFileBlock extends SchemaCo
/** Whether we are including memstore timestamp after every key/value */
private boolean includesMemstoreTS;
+ public void appendEncodedKV(final long memstoreTS, final byte[] key,
+ final int keyOffset, final int keyLength, final byte[] value,
+ final int valueOffset, final int valueLength) throws IOException {
+ if (encodedWriter == null) {
+ throw new IOException("Must initialize encoded writer");
+ }
+ encodedWriter.update(memstoreTS, key, keyOffset, keyLength, value,
+ valueOffset, valueLength);
+ }
+
/**
* @param compressionAlgorithm compression algorithm to use
- * @param dataBlockEncoderAlgo data block encoding algorithm to use
+ * @param dataBlockEncoder data block encoding algorithm to use
*/
public Writer(Compression.Algorithm compressionAlgorithm,
HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS) {
compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
this.dataBlockEncoder = dataBlockEncoder != null
? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
+ this.encodedWriter = null;
baosInMemory = new ByteArrayOutputStream();
if (compressAlgo != NONE) {
@@ -614,18 +627,25 @@ public class HFileBlock extends SchemaCo
// We will compress it later in finishBlock()
userDataStream = new DataOutputStream(baosInMemory);
- return userDataStream;
+
+ // We only encode data blocks.
+ if (this.blockType == BlockType.DATA) {
+ this.encodedWriter = this.dataBlockEncoder.getEncodedWriter(this.userDataStream,
+ this.includesMemstoreTS);
+ this.encodedWriter.reserveMetadataSpace();
+ return null; // The caller should use appendEncodedKV
+ } else {
+ this.encodedWriter = null;
+ return userDataStream;
+ }
}
/**
- * Returns the stream for the user to write to. The block writer takes care
- * of handling compression and buffering for caching on write. Can only be
- * called in the "writing" state.
- *
- * @return the data output stream for the user to write to
+ * Used to get the stream for the user to write data block contents into. This is unsafe and
+ * should only be used for testing. Key/value pairs should be appended using
+ * {@link #appendEncodedKV(long, byte[], int, int, byte[], int, int)}.
*/
- DataOutputStream getUserDataStream() {
- expectState(State.WRITING);
+ DataOutputStream getUserDataStreamUnsafe() {
return userDataStream;
}
@@ -706,31 +726,16 @@ public class HFileBlock extends SchemaCo
return; // skip any non-data block
}
- // do data block encoding, if data block encoder is set
- ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader,
+ if (this.encodedWriter == null) {
+ throw new IOException("All data blocks must use an encoded writer");
+ }
+
+ if (this.dataBlockEncoder.finishEncoding(uncompressedBytesWithHeader,
HEADER_SIZE, uncompressedBytesWithHeader.length -
- HEADER_SIZE).slice();
- Pair<ByteBuffer, BlockType> encodingResult =
- dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
- includesMemstoreTS);
-
- BlockType encodedBlockType = encodingResult.getSecond();
- if (encodedBlockType == BlockType.ENCODED_DATA) {
- uncompressedBytesWithHeader = encodingResult.getFirst().array();
- blockType = BlockType.ENCODED_DATA;
- } else {
- // There is no encoding configured. Do some extra sanity-checking.
- if (encodedBlockType != BlockType.DATA) {
- throw new IOException("Unexpected block type coming out of data " +
- "block encoder: " + encodedBlockType);
- }
- if (userDataStream.size() !=
- uncompressedBytesWithHeader.length - HEADER_SIZE) {
- throw new IOException("Uncompressed size mismatch: "
- + userDataStream.size() + " vs. "
- + (uncompressedBytesWithHeader.length - HEADER_SIZE));
- }
+ HEADER_SIZE, this.encodedWriter)) {
+ this.blockType = BlockType.ENCODED_DATA;
}
+ this.encodedWriter = null;
}
/**
@@ -776,7 +781,7 @@ public class HFileBlock extends SchemaCo
* @param out the output stream to write the
* @throws IOException
*/
- private void writeHeaderAndData(DataOutputStream out) throws IOException {
+ void writeHeaderAndData(DataOutputStream out) throws IOException {
ensureBlockReady();
out.write(onDiskBytesWithHeader);
}
@@ -1073,18 +1078,26 @@ public class HFileBlock extends SchemaCo
t0 = EnvironmentEdgeManager.currentTimeMillis();
numPositionalRead.incrementAndGet();
- int ret = istream.read(fileOffset, dest, destOffset, size + extraSize);
- if (pData != null) {
- t1 = EnvironmentEdgeManager.currentTimeMillis();
- timeToRead = t1 - t0;
- pData.addToHist(ProfilingData.HFILE_BLOCK_P_READ_TIME_MS, timeToRead);
- }
- if (ret < size) {
- throw new IOException("Positional read of " + size + " bytes " +
- "failed at offset " + fileOffset + " (returned " + ret + ")");
+ int sizeToRead = size + extraSize;
+ int sizeRead = 0;
+ if (sizeToRead > 0) {
+ sizeRead = istream.read(fileOffset, dest, destOffset, sizeToRead);
+ if (pData != null) {
+ t1 = EnvironmentEdgeManager.currentTimeMillis();
+ timeToRead = t1 - t0;
+ pData.addToHist(ProfilingData.HFILE_BLOCK_P_READ_TIME_MS, timeToRead);
+ }
+ if (size == 0 && sizeRead == -1) {
+ // a degenerate case of a zero-size block and no next block header.
+ sizeRead = 0;
+ }
+ if (sizeRead < size) {
+ throw new IOException("Positional read of " + sizeToRead + " bytes " +
+ "failed at offset " + fileOffset + " (returned " + sizeRead + ")");
+ }
}
- if (ret == size || ret < size + extraSize) {
+ if (sizeRead == size || sizeRead < sizeToRead) {
// Could not read the next block's header, or did not try.
return -1;
}
@@ -1146,23 +1159,14 @@ public class HFileBlock extends SchemaCo
* Decompresses data from the given stream using the configured compression
* algorithm.
*
- * @param boundedStream
- * a stream to read compressed data from, bounded to the exact
- * amount of compressed data
- * @param compressedSize
- * compressed data size, header not included
* @param uncompressedSize
* uncompressed data size, header not included
- * @param header
- * the header to include before the decompressed data, or null.
- * Only the first {@link HFileBlock#HEADER_SIZE} bytes of the
- * buffer are included.
* @return the byte buffer containing the given header (optionally) and the
* decompressed data
* @throws IOException
*/
protected void decompress(byte[] dest, int destOffset,
- InputStream bufferedBoundedStream, int compressedSize,
+ InputStream bufferedBoundedStream,
int uncompressedSize) throws IOException {
Decompressor decompressor = null;
try {
@@ -1281,7 +1285,7 @@ public class HFileBlock extends SchemaCo
InputStream bufferedBoundedStream = createBufferedBoundedStream(
offset, onDiskSize, pread);
decompress(buf.array(), buf.arrayOffset() + HEADER_DELTA,
- bufferedBoundedStream, onDiskSize, uncompressedSizeWithMagic);
+ bufferedBoundedStream, uncompressedSizeWithMagic);
// We don't really have a good way to exclude the "magic record" size
// from the compressed block's size, since it is compressed as well.
@@ -1448,7 +1452,7 @@ public class HFileBlock extends SchemaCo
b.allocateBuffer(b.nextBlockOnDiskSizeWithHeader > 0);
decompress(b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE, dis,
- onDiskSizeWithoutHeader, b.uncompressedSizeWithoutHeader);
+ b.uncompressedSizeWithoutHeader);
// Copy next block's header bytes into the new block if we have them.
if (nextBlockOnDiskSize > 0) {
@@ -1511,7 +1515,7 @@ public class HFileBlock extends SchemaCo
compressedBytes, HEADER_SIZE, b.onDiskSizeWithoutHeader));
decompress(b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE, dis,
- b.onDiskSizeWithoutHeader, b.uncompressedSizeWithoutHeader);
+ b.uncompressedSizeWithoutHeader);
if (b.nextBlockOnDiskSizeWithHeader > 0) {
// Copy the next block's header into the new block.
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java Tue Oct 23 21:58:43 2012
@@ -16,9 +16,11 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Pair;
@@ -82,4 +84,23 @@ public interface HFileDataBlockEncoder {
*/
public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
+ /**
+ * @return an incremental encoding writer that uses the given output stream
+ */
+ public DataBlockEncoder.EncodedWriter getEncodedWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException;
+
+ /**
+ * Complete the encoding process
+ *
+ *
+ * @param data The encoded data so far
+ * @param offset Location of encoded bytes in data
+ * @param length Length of encoded bytes in data
+ * @param writer EncodedWriter used to insert encoded data
+ * @return true if the data was stored in an encoded format, false if unencoded
+ * @throws IOException
+ */
+ public boolean finishEncoding(byte[] data, final int offset, final int length,
+ DataBlockEncoder.EncodedWriter writer) throws IOException;
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java Tue Oct 23 21:58:43 2012
@@ -209,4 +209,20 @@ public class HFileDataBlockEncoderImpl i
inCache + ")";
}
+ @Override
+ public DataBlockEncoder.EncodedWriter getEncodedWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException {
+ if (onDisk != DataBlockEncoding.NONE) {
+ this.onDisk.writeIdInBytes(out);
+ }
+
+ return this.onDisk.getEncoder().createWriter(out, includesMemstoreTS);
+ }
+
+ @Override
+ public boolean finishEncoding(byte[] data, final int offset, final int length,
+ DataBlockEncoder.EncodedWriter writer) throws IOException {
+ int bytesToSkip = onDisk.encodingIdSize();
+ return writer.finishEncoding(data, offset + bytesToSkip, length - bytesToSkip);
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Tue Oct 23 21:58:43 2012
@@ -23,7 +23,6 @@ import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -37,7 +36,6 @@ import org.apache.hadoop.hbase.io.encodi
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
import org.apache.hadoop.hbase.ipc.ProfilingData;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Tue Oct 23 21:58:43 2012
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
/**
* Writes HFile format version 2.
@@ -282,9 +281,9 @@ public class HFileWriterV2 extends Abstr
* @param vlength
* @throws IOException
*/
- private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,
- final byte[] value, final int voffset, final int vlength)
- throws IOException {
+ private void append(final long memstoreTS, final byte[] key,
+ final int koffset, final int klength, final byte[] value,
+ final int voffset, final int vlength) throws IOException {
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
if (!dupKey) {
@@ -296,18 +295,10 @@ public class HFileWriterV2 extends Abstr
// Write length of key and value and then actual key and value bytes.
// Additionally, we may also write down the memstoreTS.
- {
- DataOutputStream out = fsBlockWriter.getUserDataStream();
- out.writeInt(klength);
- totalKeyLength += klength;
- out.writeInt(vlength);
- totalValueLength += vlength;
- out.write(key, koffset, klength);
- out.write(value, voffset, vlength);
- if (this.includeMemstoreTS) {
- WritableUtils.writeVLong(out, memstoreTS);
- }
- }
+ this.fsBlockWriter.appendEncodedKV(memstoreTS, key, koffset, klength, value,
+ voffset, vlength);
+ totalKeyLength += klength;
+ totalValueLength += vlength;
// Are we the first key in this block?
if (firstKeyInBlock == null) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java Tue Oct 23 21:58:43 2012
@@ -16,8 +16,11 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Pair;
@@ -77,4 +80,16 @@ public class NoOpDataBlockEncoder implem
return getClass().getSimpleName();
}
+ @Override
+ public DataBlockEncoder.EncodedWriter getEncodedWriter(DataOutputStream out,
+ boolean includesMemstoreTS) throws IOException {
+ return DataBlockEncoding.NONE.getEncoder().createWriter(out,
+ includesMemstoreTS);
+ }
+
+ @Override
+ public boolean finishEncoding(byte[] data, final int offset, final int length,
+ DataBlockEncoder.EncodedWriter writer) throws IOException {
+ return writer.finishEncoding(data, offset, length);
+ }
}