You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:19:30 UTC
svn commit: r1181555 [4/4] - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/io/hfile/
test/java/org/apache/hadoop/hbase/io/hfile/
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,453 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
+import org.apache.hadoop.hbase.util.BloomFilterWriter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Writes HFile format version 2.
+ */
+public class HFileWriterV2 extends AbstractHFileWriter {
+
+ /** Inline block writers for multi-level block index and compound Blooms. */
+ private List<InlineBlockWriter> inlineBlockWriters =
+ new ArrayList<InlineBlockWriter>();
+
+ /** Unified version 2 block writer */
+ private HFileBlock.Writer fsBlockWriter;
+
+ private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
+ private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
+
+ /** The offset of the first data block or -1 if the file is empty. */
+ private long firstDataBlockOffset = -1;
+
+ /** The offset of the last data block or 0 if the file is empty. */
+ private long lastDataBlockOffset;
+
+ /** Additional data items to be written to the "load-on-open" section. */
+ private List<BlockWritable> additionalLoadOnOpenData =
+ new ArrayList<BlockWritable>();
+
+ static class WriterFactoryV2 extends HFile.WriterFactory {
+
+ WriterFactoryV2(Configuration conf) { super(conf); }
+
+ @Override
+ public Writer createWriter(FileSystem fs, Path path)
+ throws IOException {
+ return new HFileWriterV2(conf, fs, path);
+ }
+
+ @Override
+ public Writer createWriter(FileSystem fs, Path path, int blockSize,
+ int bytesPerChecksum, Compression.Algorithm compress,
+ final KeyComparator comparator) throws IOException {
+ return new HFileWriterV2(conf, fs, path, blockSize, bytesPerChecksum,
+ compress, comparator);
+ }
+
+ @Override
+ public Writer createWriter(FileSystem fs, Path path, int blockSize,
+ int bytesPerChecksum, String compress, final KeyComparator comparator)
+ throws IOException {
+ return new HFileWriterV2(conf, fs, path, blockSize, bytesPerChecksum,
+ compress, comparator);
+ }
+
+ @Override
+ public Writer createWriter(final FSDataOutputStream ostream,
+ final int blockSize, final String compress,
+ final KeyComparator comparator) throws IOException {
+ return new HFileWriterV2(conf, ostream, blockSize, compress, comparator);
+ }
+
+ @Override
+ public Writer createWriter(final FSDataOutputStream ostream,
+ final int blockSize, final Compression.Algorithm compress,
+ final KeyComparator c) throws IOException {
+ return new HFileWriterV2(conf, ostream, blockSize, compress, c);
+ }
+ }
+
+ /** Constructor that uses all defaults for compression and block size. */
+ public HFileWriterV2(Configuration conf, FileSystem fs, Path path)
+ throws IOException {
+ this(conf, fs, path, HFile.DEFAULT_BLOCKSIZE,
+ HFile.DEFAULT_BYTES_PER_CHECKSUM, HFile.DEFAULT_COMPRESSION_ALGORITHM,
+ null);
+ }
+
+ /**
+ * Constructor that takes a path, creates and closes the output stream. Takes
+ * compression algorithm name as string.
+ */
+ public HFileWriterV2(Configuration conf, FileSystem fs, Path path,
+ int blockSize, int bytesPerChecksum, String compressAlgoName,
+ final KeyComparator comparator) throws IOException {
+ this(conf, fs, path, blockSize, bytesPerChecksum,
+ compressionByName(compressAlgoName), comparator);
+ }
+
+ /** Constructor that takes a path, creates and closes the output stream. */
+ public HFileWriterV2(Configuration conf, FileSystem fs, Path path,
+ int blockSize, int bytesPerChecksum, Compression.Algorithm compressAlgo,
+ final KeyComparator comparator) throws IOException {
+ super(conf, createOutputStream(conf, fs, path, bytesPerChecksum), path,
+ blockSize, compressAlgo, comparator);
+ finishInit(conf);
+ }
+
+ /** Constructor that takes a stream. */
+ public HFileWriterV2(final Configuration conf,
+ final FSDataOutputStream outputStream, final int blockSize,
+ final String compressAlgoName, final KeyComparator comparator)
+ throws IOException {
+ this(conf, outputStream, blockSize,
+ Compression.getCompressionAlgorithmByName(compressAlgoName),
+ comparator);
+ }
+
+ /** Constructor that takes a stream. */
+ public HFileWriterV2(final Configuration conf,
+ final FSDataOutputStream outputStream, final int blockSize,
+ final Compression.Algorithm compress, final KeyComparator comparator)
+ throws IOException {
+ super(conf, outputStream, null, blockSize, compress, comparator);
+ finishInit(conf);
+ }
+
+ /** Additional initialization steps */
+ private void finishInit(final Configuration conf) {
+ if (fsBlockWriter != null)
+ throw new IllegalStateException("finishInit called twice");
+
+ // HFile filesystem-level (non-caching) block writer
+ fsBlockWriter = new HFileBlock.Writer(compressAlgo);
+
+ // Data block index writer
+ dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
+ cacheIndexBlocksOnWrite ? blockCache : null,
+ cacheIndexBlocksOnWrite ? name : null);
+ dataBlockIndexWriter.setMaxChunkSize(
+ HFileBlockIndex.getMaxChunkSize(conf));
+ inlineBlockWriters.add(dataBlockIndexWriter);
+
+ // Meta data block index writer
+ metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
+ }
+
+ /**
+ * At a block boundary, write all the inline blocks and opens new block.
+ *
+ * @throws IOException
+ */
+ private void checkBlockBoundary() throws IOException {
+ if (fsBlockWriter.blockSizeWritten() < blockSize)
+ return;
+
+ finishBlock();
+ writeInlineBlocks(false);
+ newBlock();
+ }
+
+ /** Clean up the current block */
+ private void finishBlock() throws IOException {
+ if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
+ return;
+
+ long now = System.currentTimeMillis();
+
+ // Update the first data block offset for scanning.
+ if (firstDataBlockOffset == -1)
+ firstDataBlockOffset = outputStream.getPos();
+
+ // Update the last data block offset
+ lastDataBlockOffset = outputStream.getPos();
+
+ fsBlockWriter.writeHeaderAndData(outputStream);
+
+ int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
+ dataBlockIndexWriter.addEntry(firstKeyInBlock, lastDataBlockOffset,
+ onDiskSize);
+ totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
+
+ HFile.writeTime += System.currentTimeMillis() - now;
+ HFile.writeOps++;
+
+ if (cacheDataBlocksOnWrite) {
+ blockCache.cacheBlock(HFile.getBlockCacheKey(name, lastDataBlockOffset),
+ fsBlockWriter.getBlockForCaching());
+ }
+ }
+
+ /** Gives inline block writers an opportunity to contribute blocks. */
+ private void writeInlineBlocks(boolean closing) throws IOException {
+ for (InlineBlockWriter ibw : inlineBlockWriters) {
+ while (ibw.shouldWriteBlock(closing)) {
+ long offset = outputStream.getPos();
+ boolean cacheThisBlock = ibw.cacheOnWrite();
+ ibw.writeInlineBlock(fsBlockWriter.startWriting(
+ ibw.getInlineBlockType(), cacheThisBlock));
+ fsBlockWriter.writeHeaderAndData(outputStream);
+ ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
+ fsBlockWriter.getUncompressedSizeWithoutHeader());
+
+ if (cacheThisBlock) {
+ // Cache this block on write.
+ blockCache.cacheBlock(HFile.getBlockCacheKey(name, offset),
+ fsBlockWriter.getBlockForCaching());
+ }
+ }
+ }
+ }
+
+ /**
+ * Ready a new block for writing.
+ *
+ * @throws IOException
+ */
+ private void newBlock() throws IOException {
+ // This is where the next block begins.
+ fsBlockWriter.startWriting(BlockType.DATA, cacheDataBlocksOnWrite);
+ firstKeyInBlock = null;
+ }
+
+ /**
+ * Add a meta block to the end of the file. Call before close(). Metadata
+ * blocks are expensive. Fill one with a bunch of serialized data rather than
+ * do a metadata block per metadata instance. If metadata is small, consider
+ * adding to file info using {@link #appendFileInfo(byte[], byte[])}
+ *
+ * @param metaBlockName
+ * name of the block
+ * @param content
+ * will call readFields to get data later (DO NOT REUSE)
+ */
+ @Override
+ public void appendMetaBlock(String metaBlockName, Writable content) {
+ byte[] key = Bytes.toBytes(metaBlockName);
+ int i;
+ for (i = 0; i < metaNames.size(); ++i) {
+ // stop when the current key is greater than our own
+ byte[] cur = metaNames.get(i);
+ if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
+ key.length) > 0) {
+ break;
+ }
+ }
+ metaNames.add(i, key);
+ metaData.add(i, content);
+ }
+
+ /**
+ * Add key/value to file. Keys must be added in an order that agrees with the
+ * Comparator passed on construction.
+ *
+ * @param kv
+ * KeyValue to add. Cannot be empty nor null.
+ * @throws IOException
+ */
+ @Override
+ public void append(final KeyValue kv) throws IOException {
+ append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
+ kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+ }
+
+ /**
+ * Add key/value to file. Keys must be added in an order that agrees with the
+ * Comparator passed on construction.
+ *
+ * @param key
+ * Key to add. Cannot be empty nor null.
+ * @param value
+ * Value to add. Cannot be empty nor null.
+ * @throws IOException
+ */
+ @Override
+ public void append(final byte[] key, final byte[] value) throws IOException {
+ append(key, 0, key.length, value, 0, value.length);
+ }
+
+ /**
+ * Add key/value to file. Keys must be added in an order that agrees with the
+ * Comparator passed on construction.
+ *
+ * @param key
+ * @param koffset
+ * @param klength
+ * @param value
+ * @param voffset
+ * @param vlength
+ * @throws IOException
+ */
+ private void append(final byte[] key, final int koffset, final int klength,
+ final byte[] value, final int voffset, final int vlength)
+ throws IOException {
+ boolean dupKey = checkKey(key, koffset, klength);
+ checkValue(value, voffset, vlength);
+ if (!dupKey) {
+ checkBlockBoundary();
+ }
+
+ if (!fsBlockWriter.isWriting())
+ newBlock();
+
+ // Write length of key and value and then actual key and value bytes.
+ {
+ DataOutputStream out = fsBlockWriter.getUserDataStream();
+ out.writeInt(klength);
+ totalKeyLength += klength;
+ out.writeInt(vlength);
+ totalValueLength += vlength;
+ out.write(key, koffset, klength);
+ out.write(value, voffset, vlength);
+ }
+
+ // Are we the first key in this block?
+ if (firstKeyInBlock == null) {
+ // Copy the key.
+ firstKeyInBlock = new byte[klength];
+ System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
+ }
+
+ lastKeyBuffer = key;
+ lastKeyOffset = koffset;
+ lastKeyLength = klength;
+ entryCount++;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (outputStream == null) {
+ return;
+ }
+ // Write out the end of the data blocks, then write meta data blocks.
+ // followed by fileinfo, data block index and meta block index.
+
+ finishBlock();
+ writeInlineBlocks(true);
+
+ FixedFileTrailer trailer = new FixedFileTrailer(2);
+
+ // Write out the metadata blocks if any.
+ if (metaNames.size() > 0) {
+ for (int i = 0; i < metaNames.size(); ++i) {
+ // store the beginning offset
+ long offset = outputStream.getPos();
+ // write the metadata content
+ DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META,
+ cacheDataBlocksOnWrite);
+ metaData.get(i).write(dos);
+
+ fsBlockWriter.writeHeaderAndData(outputStream);
+
+ // Add the new meta block to the meta index.
+ metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
+ fsBlockWriter.getOnDiskSizeWithHeader());
+ }
+ }
+
+ // Load-on-open section.
+
+ // Data block index.
+ //
+ // In version 2, this section of the file starts with the root level data
+ // block index. We call a function that writes intermediate-level blocks
+ // first, then root level, and returns the offset of the root level block
+ // index.
+
+ long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
+ trailer.setLoadOnOpenOffset(rootIndexOffset);
+
+ // Meta block index.
+ metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
+ BlockType.ROOT_INDEX, false), "meta");
+ fsBlockWriter.writeHeaderAndData(outputStream);
+
+ // File info
+ writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO,
+ false));
+ fsBlockWriter.writeHeaderAndData(outputStream);
+
+ // Load-on-open data supplied by higher levels, e.g. Bloom filters.
+ for (BlockWritable w : additionalLoadOnOpenData)
+ fsBlockWriter.writeBlock(w, outputStream);
+
+ // Now finish off the trailer.
+ trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
+ trailer.setUncompressedDataIndexSize(
+ dataBlockIndexWriter.getTotalUncompressedSize());
+ trailer.setFirstDataBlockOffset(firstDataBlockOffset);
+ trailer.setLastDataBlockOffset(lastDataBlockOffset);
+ trailer.setComparatorClass(comparator.getClass());
+ trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
+
+ finishClose(trailer);
+
+ fsBlockWriter.releaseCompressor();
+ }
+
+ @Override
+ public void addInlineBlockWriter(InlineBlockWriter ibw) {
+ inlineBlockWriters.add(ibw);
+ if (blockCache == null && ibw.cacheOnWrite())
+ initBlockCache();
+ }
+
+ @Override
+ public void addBloomFilter(final BloomFilterWriter bfw) {
+ if (bfw.getKeyCount() <= 0)
+ return;
+
+ additionalLoadOnOpenData.add(new BlockWritable() {
+ @Override
+ public BlockType getBlockType() {
+ return BlockType.BLOOM_META;
+ }
+
+ @Override
+ public void writeToBlock(DataOutput out) throws IOException {
+ bfw.getMetaWriter().write(out);
+ Writable dataWriter = bfw.getDataWriter();
+ if (dataWriter != null)
+ dataWriter.write(out);
+ }
+ });
+ }
+
+}
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+
+@RunWith(Parameterized.class)
+public class TestFixedFileTrailer {
+
+ private static final Log LOG = LogFactory.getLog(TestFixedFileTrailer.class);
+
+ /** The number of used fields by version. Indexed by version minus one. */
+ private static final int[] NUM_FIELDS_BY_VERSION = new int[] { 8, 13 };
+
+ private HBaseTestingUtility util = new HBaseTestingUtility();
+ private FileSystem fs;
+ private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ private int version;
+
+ static {
+ assert NUM_FIELDS_BY_VERSION.length == HFile.MAX_FORMAT_VERSION
+ - HFile.MIN_FORMAT_VERSION + 1;
+ }
+
+ public TestFixedFileTrailer(int version) {
+ this.version = version;
+ }
+
+ @Parameters
+ public static Collection<Object[]> getParameters() {
+ List<Object[]> versionsToTest = new ArrayList<Object[]>();
+ for (int v = HFile.MIN_FORMAT_VERSION; v <= HFile.MAX_FORMAT_VERSION; ++v)
+ versionsToTest.add(new Integer[] { v } );
+ return versionsToTest;
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ fs = FileSystem.get(util.getConfiguration());
+ }
+
+ @Test
+ public void testTrailer() throws IOException {
+ FixedFileTrailer t = new FixedFileTrailer(version);
+ t.setDataIndexCount(3);
+ t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
+
+ if (version == 1) {
+ t.setFileInfoOffset(876);
+ }
+
+ if (version == 2) {
+ t.setLastDataBlockOffset(291);
+ t.setNumDataIndexLevels(3);
+ t.setComparatorClass(KeyValue.KEY_COMPARATOR.getClass());
+ t.setFirstDataBlockOffset(9081723123L); // Completely unrealistic.
+ t.setUncompressedDataIndexSize(827398717L); // Something random.
+ }
+
+ t.setLoadOnOpenOffset(128);
+ t.setMetaIndexCount(7);
+
+ t.setTotalUncompressedBytes(129731987);
+
+ {
+ DataOutputStream dos = new DataOutputStream(baos); // Limited scope.
+ t.serialize(dos);
+ dos.flush();
+ assertEquals(dos.size(), FixedFileTrailer.getTrailerSize(version));
+ }
+
+ byte[] bytes = baos.toByteArray();
+ baos.reset();
+
+ assertEquals(bytes.length, FixedFileTrailer.getTrailerSize(version));
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+
+ // Finished writing, trying to read.
+ {
+ DataInputStream dis = new DataInputStream(bais);
+ FixedFileTrailer t2 = new FixedFileTrailer(version);
+ t2.deserialize(dis);
+ assertEquals(-1, bais.read()); // Ensure we have read everything.
+ checkLoadedTrailer(version, t, t2);
+ }
+
+ // Now check what happens if the trailer is corrupted.
+ Path trailerPath = new Path(HBaseTestingUtility.getTestDir(), "trailer_"
+ + version);
+
+ {
+ for (byte invalidVersion : new byte[] { HFile.MIN_FORMAT_VERSION - 1,
+ HFile.MAX_FORMAT_VERSION + 1}) {
+ bytes[bytes.length - 1] = invalidVersion;
+ writeTrailer(trailerPath, null, bytes);
+ try {
+ readTrailer(trailerPath);
+ fail("Exception expected");
+ } catch (IOException ex) {
+ // Make it easy to debug this.
+ String msg = ex.getMessage();
+ String cleanMsg = msg.replaceAll(
+ "^(java(\\.[a-zA-Z]+)+:\\s+)?|\\s+\\(.*\\)\\s*$", "");
+ assertEquals("Actual exception message is \"" + msg + "\".\n" +
+ "Cleaned-up message", // will be followed by " expected: ..."
+ "Invalid HFile version: " + invalidVersion, cleanMsg);
+ LOG.info("Got an expected exception: " + msg);
+ }
+ }
+
+ }
+
+ // Now write the trailer into a file and auto-detect the version.
+ writeTrailer(trailerPath, t, null);
+
+ FixedFileTrailer t4 = readTrailer(trailerPath);
+
+ checkLoadedTrailer(version, t, t4);
+
+ String trailerStr = t.toString();
+ assertEquals("Invalid number of fields in the string representation "
+ + "of the trailer: " + trailerStr, NUM_FIELDS_BY_VERSION[version - 1],
+ trailerStr.split(", ").length);
+ assertEquals(trailerStr, t4.toString());
+ }
+
+ private FixedFileTrailer readTrailer(Path trailerPath) throws IOException {
+ FSDataInputStream fsdis = fs.open(trailerPath);
+ FixedFileTrailer trailerRead = FixedFileTrailer.readFromStream(fsdis,
+ fs.getFileStatus(trailerPath).getLen());
+ fsdis.close();
+ return trailerRead;
+ }
+
+ private void writeTrailer(Path trailerPath, FixedFileTrailer t,
+ byte[] useBytesInstead) throws IOException {
+ assert (t == null) != (useBytesInstead == null); // Expect one non-null.
+
+ FSDataOutputStream fsdos = fs.create(trailerPath);
+ fsdos.write(135); // to make deserializer's job less trivial
+ if (useBytesInstead != null) {
+ fsdos.write(useBytesInstead);
+ } else {
+ t.serialize(fsdos);
+ }
+ fsdos.close();
+ }
+
+ private void checkLoadedTrailer(int version, FixedFileTrailer expected,
+ FixedFileTrailer loaded) throws IOException {
+ assertEquals(version, loaded.getVersion());
+ assertEquals(expected.getDataIndexCount(), loaded.getDataIndexCount());
+
+ assertEquals(Math.min(expected.getEntryCount(),
+ version == 1 ? Integer.MAX_VALUE : Long.MAX_VALUE),
+ loaded.getEntryCount());
+
+ if (version == 1) {
+ assertEquals(expected.getFileInfoOffset(), loaded.getFileInfoOffset());
+ }
+
+ if (version == 2) {
+ assertEquals(expected.getLastDataBlockOffset(),
+ loaded.getLastDataBlockOffset());
+ assertEquals(expected.getNumDataIndexLevels(),
+ loaded.getNumDataIndexLevels());
+ assertEquals(expected.createComparator().getClass().getName(),
+ loaded.createComparator().getClass().getName());
+ assertEquals(expected.getFirstDataBlockOffset(),
+ loaded.getFirstDataBlockOffset());
+ assertTrue(
+ expected.createComparator() instanceof KeyValue.KeyComparator);
+ assertEquals(expected.getUncompressedDataIndexSize(),
+ loaded.getUncompressedDataIndexSize());
+ }
+
+ assertEquals(expected.getLoadOnOpenDataOffset(),
+ loaded.getLoadOnOpenDataOffset());
+ assertEquals(expected.getMetaIndexCount(), loaded.getMetaIndexCount());
+
+ assertEquals(expected.getTotalUncompressedBytes(),
+ loaded.getTotalUncompressedBytes());
+ }
+
+}
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,256 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Testing writing a version 2 {@link HFile}. This is a low-level test written
+ * during the development of {@link HFileWriterV2}.
+ */
+public class TestHFileWriterV2 {
+
+ private static final Log LOG = LogFactory.getLog(TestHFileWriterV2.class);
+
+ private static final HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ private Configuration conf;
+ private FileSystem fs;
+
+ @Before
+ public void setUp() throws IOException {
+ conf = TEST_UTIL.getConfiguration();
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void testHFileFormatV2() throws IOException {
+ Path hfilePath = new Path(HBaseTestingUtility.getTestDir(),
+ "testHFileFormatV2");
+
+ final Compression.Algorithm COMPRESS_ALGO = Compression.Algorithm.GZ;
+ HFileWriterV2 writer = new HFileWriterV2(conf, fs, hfilePath, 4096, 512,
+ COMPRESS_ALGO, KeyValue.KEY_COMPARATOR);
+
+ long totalKeyLength = 0;
+ long totalValueLength = 0;
+
+ Random rand = new Random(9713312); // Just a fixed seed.
+
+ final int ENTRY_COUNT = 10000;
+ List<byte[]> keys = new ArrayList<byte[]>();
+ List<byte[]> values = new ArrayList<byte[]>();
+
+ for (int i = 0; i < ENTRY_COUNT; ++i) {
+ byte[] keyBytes = randomOrderedKey(rand, i);
+
+ // A random-length random value.
+ byte[] valueBytes = randomValue(rand);
+ writer.append(keyBytes, valueBytes);
+
+ totalKeyLength += keyBytes.length;
+ totalValueLength += valueBytes.length;
+
+ keys.add(keyBytes);
+ values.add(valueBytes);
+ }
+
+ // Add in an arbitrary order. They will be sorted lexicographically by
+ // the key.
+ writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C."));
+ writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow"));
+ writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris"));
+
+ writer.close();
+
+ FSDataInputStream fsdis = fs.open(hfilePath);
+
+ // A "manual" version of a new-format HFile reader. This unit test was
+ // written before the V2 reader was fully implemented.
+
+ long fileSize = fs.getFileStatus(hfilePath).getLen();
+ FixedFileTrailer trailer =
+ FixedFileTrailer.readFromStream(fsdis, fileSize);
+
+ assertEquals(2, trailer.getVersion());
+ assertEquals(ENTRY_COUNT, trailer.getEntryCount());
+
+ HFileBlock.FSReader blockReader =
+ new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize);
+
+ // Counters for the number of key/value pairs and the number of blocks
+ int entriesRead = 0;
+ int blocksRead = 0;
+
+ // Scan blocks the way the reader would scan them
+ fsdis.seek(0);
+ long curBlockPos = 0;
+ while (curBlockPos <= trailer.getLastDataBlockOffset()) {
+ HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
+ assertEquals(BlockType.DATA, block.getBlockType());
+ ByteBuffer buf = block.getBufferWithoutHeader();
+ while (buf.hasRemaining()) {
+ int keyLen = buf.getInt();
+ int valueLen = buf.getInt();
+
+ byte[] key = new byte[keyLen];
+ buf.get(key);
+
+ byte[] value = new byte[valueLen];
+ buf.get(value);
+
+ // A brute-force check to see that all keys and values are correct.
+ assertTrue(Bytes.compareTo(key, keys.get(entriesRead)) == 0);
+ assertTrue(Bytes.compareTo(value, values.get(entriesRead)) == 0);
+
+ ++entriesRead;
+ }
+ ++blocksRead;
+ curBlockPos += block.getOnDiskSizeWithHeader();
+ }
+ LOG.info("Finished reading: entries=" + entriesRead + ", blocksRead="
+ + blocksRead);
+ assertEquals(ENTRY_COUNT, entriesRead);
+
+ // Meta blocks. We can scan until the load-on-open data offset (which is
+ // the root block index offset in version 2) because we are not testing
+ // intermediate-level index blocks here.
+
+ int metaCounter = 0;
+ while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
+ LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
+ trailer.getLoadOnOpenDataOffset());
+ HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
+ assertEquals(BlockType.META, block.getBlockType());
+ Text t = new Text();
+ block.readInto(t);
+ Text expectedText =
+ (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text(
+ "Moscow") : new Text("Washington, D.C."));
+ assertEquals(expectedText, t);
+ LOG.info("Read meta block data: " + t);
+ ++metaCounter;
+ curBlockPos += block.getOnDiskSizeWithHeader();
+ }
+
+ fsdis.close();
+ }
+
+ // Static stuff used by various HFile v2 unit tests
+
+ private static final String COLUMN_FAMILY_NAME = "_-myColumnFamily-_";
+ private static final int MIN_ROW_OR_QUALIFIER_LENGTH = 64;
+ private static final int MAX_ROW_OR_QUALIFIER_LENGTH = 128;
+
+ /**
+ * Generates a random key that is guaranteed to increase as the given index i
+ * increases. The result consists of a prefix, which is a deterministic
+ * increasing function of i, and a random suffix.
+ *
+ * @param rand
+ * random number generator to use
+ * @param i
+ * @return
+ */
+ public static byte[] randomOrderedKey(Random rand, int i) {
+ StringBuilder k = new StringBuilder();
+
+ // The fixed-length lexicographically increasing part of the key.
+ for (int bitIndex = 31; bitIndex >= 0; --bitIndex) {
+ if ((i & (1 << bitIndex)) == 0)
+ k.append("a");
+ else
+ k.append("b");
+ }
+
+ // A random-length random suffix of the key.
+ for (int j = 0; j < rand.nextInt(50); ++j)
+ k.append(randomReadableChar(rand));
+
+ byte[] keyBytes = k.toString().getBytes();
+ return keyBytes;
+ }
+
+ public static byte[] randomValue(Random rand) {
+ StringBuilder v = new StringBuilder();
+ for (int j = 0; j < 1 + rand.nextInt(2000); ++j) {
+ v.append((char) (32 + rand.nextInt(95)));
+ }
+
+ byte[] valueBytes = v.toString().getBytes();
+ return valueBytes;
+ }
+
+ public static final char randomReadableChar(Random rand) {
+ int i = rand.nextInt(26 * 2 + 10 + 1);
+ if (i < 26)
+ return (char) ('A' + i);
+ i -= 26;
+
+ if (i < 26)
+ return (char) ('a' + i);
+ i -= 26;
+
+ if (i < 10)
+ return (char) ('0' + i);
+ i -= 10;
+
+ assert i == 0;
+ return '_';
+ }
+
+ public static byte[] randomRowOrQualifier(Random rand) {
+ StringBuilder field = new StringBuilder();
+ int fieldLen = MIN_ROW_OR_QUALIFIER_LENGTH
+ + rand.nextInt(MAX_ROW_OR_QUALIFIER_LENGTH
+ - MIN_ROW_OR_QUALIFIER_LENGTH + 1);
+ for (int i = 0; i < fieldLen; ++i)
+ field.append(randomReadableChar(rand));
+ return field.toString().getBytes();
+ }
+
+ public static KeyValue randomKeyValue(Random rand) {
+ return new KeyValue(randomRowOrQualifier(rand),
+ COLUMN_FAMILY_NAME.getBytes(), randomRowOrQualifier(rand),
+ randomValue(rand));
+ }
+
+}