You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:19:30 UTC
svn commit: r1181555 [3/4] - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/io/hfile/
test/java/org/apache/hadoop/hbase/io/hfile/
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,307 @@
+
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.util.BloomFilter;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.ByteBloomFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Writables;
+
+/**
+ * Implements pretty-printing functionality for {@link HFile}s.
+ */
+public class HFilePrettyPrinter {
+
+ private static final Log LOG = LogFactory.getLog(HFilePrettyPrinter.class);
+
+ private Options options = new Options();
+
+ private boolean verbose;
+ private boolean printValue;
+ private boolean printKey;
+ private boolean shouldPrintMeta;
+ private boolean printBlocks;
+ private boolean checkRow;
+ private boolean checkFamily;
+
+ private Configuration conf;
+
+ private List<Path> files = new ArrayList<Path>();
+ private int count;
+
+ private static final String FOUR_SPACES = " ";
+
+ public HFilePrettyPrinter() {
+ options.addOption("v", "verbose", false,
+ "Verbose output; emits file and meta data delimiters");
+ options.addOption("p", "printkv", false, "Print key/value pairs");
+ options.addOption("e", "printkey", false, "Print keys");
+ options.addOption("m", "printmeta", false, "Print meta data of file");
+ options.addOption("b", "printblocks", false, "Print block index meta data");
+ options.addOption("k", "checkrow", false,
+ "Enable row order check; looks for out-of-order keys");
+ options.addOption("a", "checkfamily", false, "Enable family check");
+ options.addOption("f", "file", true,
+ "File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34");
+ options.addOption("r", "region", true,
+ "Region to scan. Pass region name; e.g. '.META.,,1'");
+ }
+
+ public boolean parseOptions(String args[]) throws ParseException,
+ IOException {
+ if (args.length == 0) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("HFile", options, true);
+ return false;
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ verbose = cmd.hasOption("v");
+ printValue = cmd.hasOption("p");
+ printKey = cmd.hasOption("e") || printValue;
+ shouldPrintMeta = cmd.hasOption("m");
+ printBlocks = cmd.hasOption("b");
+ checkRow = cmd.hasOption("k");
+ checkFamily = cmd.hasOption("a");
+
+ if (cmd.hasOption("f")) {
+ files.add(new Path(cmd.getOptionValue("f")));
+ }
+
+ if (cmd.hasOption("r")) {
+ String regionName = cmd.getOptionValue("r");
+ byte[] rn = Bytes.toBytes(regionName);
+ byte[][] hri = HRegionInfo.parseRegionName(rn);
+ Path rootDir = FSUtils.getRootDir(conf);
+ Path tableDir = new Path(rootDir, Bytes.toString(hri[0]));
+ String enc = HRegionInfo.encodeRegionName(rn);
+ Path regionDir = new Path(tableDir, enc);
+ if (verbose)
+ System.out.println("region dir -> " + regionDir);
+ List<Path> regionFiles = HFile.getStoreFiles(FileSystem.get(conf),
+ regionDir);
+ if (verbose)
+ System.out.println("Number of region files found -> "
+ + regionFiles.size());
+ if (verbose) {
+ int i = 1;
+ for (Path p : regionFiles) {
+ if (verbose)
+ System.out.println("Found file[" + i++ + "] -> " + p);
+ }
+ }
+ files.addAll(regionFiles);
+ }
+
+ return true;
+ }
+
+ /**
+ * Runs the command-line pretty-printer, and returns the desired command
+ * exit code (zero for success, non-zero for failure).
+ */
+ public int run(String[] args) {
+ conf = HBaseConfiguration.create();
+ conf.set("fs.defaultFS",
+ conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
+
+ try {
+ if (!parseOptions(args))
+ return 1;
+ } catch (IOException ex) {
+ LOG.error("Error parsing command-line options", ex);
+ return 1;
+ } catch (ParseException ex) {
+ LOG.error("Error parsing command-line options", ex);
+ return 1;
+ }
+
+ // iterate over all files found
+ for (Path fileName : files) {
+ try {
+ processFile(fileName);
+ } catch (IOException ex) {
+ LOG.error("Error reading " + fileName, ex);
+ }
+ }
+
+ if (verbose || printKey) {
+ System.out.println("Scanned kv count -> " + count);
+ }
+
+ return 0;
+ }
+
+ private void processFile(Path file) throws IOException {
+ if (verbose)
+ System.out.println("Scanning -> " + file);
+ FileSystem fs = file.getFileSystem(conf);
+ if (!fs.exists(file)) {
+ System.err.println("ERROR, file doesnt exist: " + file);
+ }
+
+ HFile.Reader reader = HFile.createReader(fs, file, null, false, false);
+
+ Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+
+ if (printKey || checkRow || checkFamily) {
+
+ // scan over file and read key/value's and check if requested
+ HFileScanner scanner = reader.getScanner(false, false, false);
+ scanner.seekTo();
+ scanKeysValues(file, count, scanner);
+ }
+
+ // print meta data
+ if (shouldPrintMeta) {
+ printMeta(reader, fileInfo);
+ }
+
+ if (printBlocks) {
+ System.out.println("Block Index:");
+ System.out.println(reader.getDataBlockIndexReader());
+ }
+
+ reader.close();
+ }
+
+ private void scanKeysValues(Path file, int count, HFileScanner scanner)
+ throws IOException {
+ KeyValue pkv = null;
+ do {
+ KeyValue kv = scanner.getKeyValue();
+ // dump key value
+ if (printKey) {
+ System.out.print("K: " + kv);
+ if (printValue) {
+ System.out.print(" V: " + Bytes.toStringBinary(kv.getValue()));
+ }
+ System.out.println();
+ }
+ // check if rows are in order
+ if (checkRow && pkv != null) {
+ if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) {
+ System.err.println("WARNING, previous row is greater then"
+ + " current row\n\tfilename -> " + file + "\n\tprevious -> "
+ + Bytes.toStringBinary(pkv.getKey()) + "\n\tcurrent -> "
+ + Bytes.toStringBinary(kv.getKey()));
+ }
+ }
+ // check if families are consistent
+ if (checkFamily) {
+ String fam = Bytes.toString(kv.getFamily());
+ if (!file.toString().contains(fam)) {
+ System.err.println("WARNING, filename does not match kv family,"
+ + "\n\tfilename -> " + file + "\n\tkeyvalue -> "
+ + Bytes.toStringBinary(kv.getKey()));
+ }
+ if (pkv != null
+ && Bytes.compareTo(pkv.getFamily(), kv.getFamily()) != 0) {
+ System.err.println("WARNING, previous kv has different family"
+ + " compared to current key\n\tfilename -> " + file
+ + "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey())
+ + "\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey()));
+ }
+ }
+ pkv = kv;
+ ++count;
+ } while (scanner.next());
+ }
+
+ /**
+ * Format a string of the form "k1=v1, k2=v2, ..." into separate lines
+ * with a four-space indentation.
+ */
+ private static String asSeparateLines(String keyValueStr) {
+ return keyValueStr.replaceAll(", ([a-zA-Z]+=)",
+ ",\n" + FOUR_SPACES + "$1");
+ }
+
+ private void printMeta(HFile.Reader reader, Map<byte[], byte[]> fileInfo)
+ throws IOException {
+ System.out.println("Block index size as per heapsize: "
+ + reader.indexSize());
+ System.out.println(asSeparateLines(reader.toString()));
+ System.out.println("Trailer:\n "
+ + asSeparateLines(reader.getTrailer().toString()));
+ System.out.println("Fileinfo:");
+ for (Map.Entry<byte[], byte[]> e : fileInfo.entrySet()) {
+ System.out.print(FOUR_SPACES + Bytes.toString(e.getKey()) + " = ");
+ if (Bytes.compareTo(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY")) == 0) {
+ long seqid = Bytes.toLong(e.getValue());
+ System.out.println(seqid);
+ } else if (Bytes.compareTo(e.getKey(), Bytes.toBytes("TIMERANGE")) == 0) {
+ TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
+ Writables.copyWritable(e.getValue(), timeRangeTracker);
+ System.out.println(timeRangeTracker.getMinimumTimestamp() + "...."
+ + timeRangeTracker.getMaximumTimestamp());
+ } else if (Bytes.compareTo(e.getKey(), FileInfo.AVG_KEY_LEN) == 0
+ || Bytes.compareTo(e.getKey(), FileInfo.AVG_VALUE_LEN) == 0) {
+ System.out.println(Bytes.toInt(e.getValue()));
+ } else {
+ System.out.println(Bytes.toStringBinary(e.getValue()));
+ }
+ }
+
+ System.out.println("Mid-key: " + Bytes.toStringBinary(reader.midkey()));
+
+ // Printing bloom information
+ DataInput bloomMeta = reader.getBloomFilterMetadata();
+ BloomFilter bloomFilter = null;
+ if (bloomMeta != null)
+ bloomFilter = BloomFilterFactory.createFromMeta(bloomMeta, reader);
+
+ System.out.println("Bloom filter:");
+ if (bloomFilter != null) {
+ System.out.println(FOUR_SPACES + bloomFilter.toString().replaceAll(
+ ByteBloomFilter.STATS_RECORD_SEP, "\n" + FOUR_SPACES));
+ } else {
+ System.out.println(FOUR_SPACES + "Not present");
+ }
+ }
+
+}
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,694 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * {@link HFile} reader for version 1.
+ */
+public class HFileReaderV1 extends AbstractHFileReader {
+ private static final Log LOG = LogFactory.getLog(HFileReaderV1.class);
+
+ private volatile boolean fileInfoLoaded = false;
+
+ /**
+ * Opens a HFile. You must load the index before you can
+ * use it by calling {@link #loadFileInfo()}.
+ *
+ * @param fsdis input stream. Caller is responsible for closing the passed
+ * stream.
+ * @param size Length of the stream.
+ * @param blockCache block cache. Pass null if none.
+ * @param inMemory whether blocks should be marked as in-memory in cache
+ * @param evictOnClose whether blocks in cache should be evicted on close
+ * @throws IOException
+ */
+ public HFileReaderV1(Path path, FixedFileTrailer trailer,
+ final FSDataInputStream fsdis, final long size,
+ final boolean closeIStream,
+ final BlockCache blockCache, final boolean inMemory,
+ final boolean evictOnClose) {
+ super(path, trailer, fsdis, size, closeIStream, blockCache, inMemory,
+ evictOnClose);
+
+ trailer.expectVersion(1);
+ fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize);
+ }
+
+ private byte[] readAllIndex(final FSDataInputStream in,
+ final long indexOffset, final int indexSize) throws IOException {
+ byte[] allIndex = new byte[indexSize];
+ in.seek(indexOffset);
+ IOUtils.readFully(in, allIndex, 0, allIndex.length);
+
+ return allIndex;
+ }
+
+ /**
+ * Read in the index and file info.
+ *
+ * @return A map of fileinfo data.
+ * @see {@link Writer#appendFileInfo(byte[], byte[])}.
+ * @throws IOException
+ */
+ @Override
+ public FileInfo loadFileInfo() throws IOException {
+ if (fileInfoLoaded)
+ return fileInfo;
+
+ // Read in the fileinfo and get what we need from it.
+ istream.seek(trailer.getFileInfoOffset());
+ fileInfo = new FileInfo();
+ fileInfo.readFields(istream);
+ lastKey = fileInfo.get(FileInfo.LASTKEY);
+ avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
+ avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
+
+ // Comparator is stored in the file info in version 1.
+ String clazzName = Bytes.toString(fileInfo.get(FileInfo.COMPARATOR));
+ comparator = getComparator(clazzName);
+
+ dataBlockIndexReader =
+ new HFileBlockIndex.BlockIndexReader(comparator, 1);
+ metaBlockIndexReader =
+ new HFileBlockIndex.BlockIndexReader(Bytes.BYTES_RAWCOMPARATOR, 1);
+
+ int sizeToLoadOnOpen = (int) (fileSize - trailer.getLoadOnOpenDataOffset() -
+ trailer.getTrailerSize());
+ byte[] dataAndMetaIndex = readAllIndex(istream,
+ trailer.getLoadOnOpenDataOffset(), sizeToLoadOnOpen);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(dataAndMetaIndex);
+ DataInputStream dis = new DataInputStream(bis);
+
+ // Read in the data index.
+ if (trailer.getDataIndexCount() > 0)
+ BlockType.INDEX_V1.readAndCheck(dis);
+ dataBlockIndexReader.readRootIndex(dis, trailer.getDataIndexCount());
+
+ // Read in the metadata index.
+ if (trailer.getMetaIndexCount() > 0)
+ BlockType.INDEX_V1.readAndCheck(dis);
+ metaBlockIndexReader.readRootIndex(dis, trailer.getMetaIndexCount());
+
+ fileInfoLoaded = true;
+ return fileInfo;
+ }
+
+ /**
+ * Creates comparator from the given class name.
+ *
+ * @param clazzName the comparator class name read from the trailer
+ * @return an instance of the comparator to use
+ * @throws IOException in case comparator class name is invalid
+ */
+ @SuppressWarnings("unchecked")
+ private RawComparator<byte[]> getComparator(final String clazzName)
+ throws IOException {
+ if (clazzName == null || clazzName.length() == 0) {
+ return null;
+ }
+ try {
+ return (RawComparator<byte[]>)Class.forName(clazzName).newInstance();
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Create a Scanner on this file. No seeks or reads are done on creation. Call
+ * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+ * nothing to clean up in a Scanner. Letting go of your references to the
+ * scanner is sufficient.
+ *
+ * @param cacheBlocks True if we should cache blocks read in by this scanner.
+ * @param pread Use positional read rather than seek+read if true (pread is
+ * better for random reads, seek+read is better scanning).
+ * @param isCompaction is scanner being used for a compaction?
+ * @return Scanner on this file.
+ */
+ @Override
+ public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+ final boolean isCompaction) {
+ return new ScannerV1(this, cacheBlocks, pread, isCompaction);
+ }
+
+ /**
+ * @param key Key to search.
+ * @return Block number of the block containing the key or -1 if not in this
+ * file.
+ */
+ protected int blockContainingKey(final byte[] key, int offset, int length) {
+ if (dataBlockIndexReader.isEmpty()) {
+ throw new RuntimeException("Block index not loaded");
+ }
+ return dataBlockIndexReader.rootBlockContainingKey(key, offset, length);
+ }
+
+ /**
+ * @param metaBlockName
+ * @param cacheBlock Add block to cache, if found
+ * @return Block wrapped in a ByteBuffer
+ * @throws IOException
+ */
+ @Override
+ public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
+ throws IOException {
+ if (trailer.getMetaIndexCount() == 0) {
+ return null; // there are no meta blocks
+ }
+ if (metaBlockIndexReader == null) {
+ throw new IOException("Meta index not loaded");
+ }
+
+ byte[] nameBytes = Bytes.toBytes(metaBlockName);
+ int block = metaBlockIndexReader.rootBlockContainingKey(nameBytes, 0,
+ nameBytes.length);
+ if (block == -1)
+ return null;
+ long offset = metaBlockIndexReader.getRootBlockOffset(block);
+ long nextOffset;
+ if (block == metaBlockIndexReader.getRootBlockCount() - 1) {
+ nextOffset = trailer.getFileInfoOffset();
+ } else {
+ nextOffset = metaBlockIndexReader.getRootBlockOffset(block + 1);
+ }
+
+ long now = System.currentTimeMillis();
+
+ String cacheKey = HFile.getBlockCacheKey(name, offset);
+
+ // Per meta key from any given file, synchronize reads for said block
+ synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
+ metaLoads++;
+ HRegion.incrNumericMetric(this.fsMetaBlockReadCntMetric, 1);
+ // Check cache for block. If found return.
+ if (blockCache != null) {
+ HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey);
+ if (cachedBlock != null) {
+ cacheHits++;
+ HRegion.incrNumericMetric(this.fsMetaBlockReadCacheHitCntMetric, 1);
+ return cachedBlock.getBufferWithoutHeader();
+ }
+ // Cache Miss, please load.
+ }
+
+ HFileBlock hfileBlock = fsBlockReader.readBlockData(offset,
+ nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block),
+ true);
+ hfileBlock.expectType(BlockType.META);
+
+ long delta = System.currentTimeMillis() - now;
+ HRegion.incrTimeVaryingMetric(this.fsReadTimeMetric, delta);
+ HFile.readTime += delta;
+ HFile.readOps++;
+
+ // Cache the block
+ if (cacheBlock && blockCache != null) {
+ blockCache.cacheBlock(cacheKey, hfileBlock, inMemory);
+ }
+
+ return hfileBlock.getBufferWithoutHeader();
+ }
+ }
+
+ /**
+ * Read in a file block.
+ * @param block Index of block to read.
+ * @param pread Use positional read instead of seek+read (positional is
+ * better doing random reads whereas seek+read is better scanning).
+ * @param isCompaction is this block being read as part of a compaction
+ * @return Block wrapped in a ByteBuffer.
+ * @throws IOException
+ */
+ ByteBuffer readBlockBuffer(int block, boolean cacheBlock,
+ final boolean pread, final boolean isCompaction) throws IOException {
+ if (dataBlockIndexReader == null) {
+ throw new IOException("Block index not loaded");
+ }
+ if (block < 0 || block >= dataBlockIndexReader.getRootBlockCount()) {
+ throw new IOException("Requested block is out of range: " + block +
+ ", max: " + dataBlockIndexReader.getRootBlockCount());
+ }
+
+ long offset = dataBlockIndexReader.getRootBlockOffset(block);
+ String cacheKey = HFile.getBlockCacheKey(name, offset);
+
+ // For any given block from any given file, synchronize reads for said
+ // block.
+ // Without a cache, this synchronizing is needless overhead, but really
+ // the other choice is to duplicate work (which the cache would prevent you
+ // from doing).
+ synchronized (dataBlockIndexReader.getRootBlockKey(block)) {
+ blockLoads++;
+
+ if (isCompaction) {
+ HRegion.incrNumericMetric(this.compactionBlockReadCntMetric, 1);
+ } else {
+ HRegion.incrNumericMetric(this.fsBlockReadCntMetric, 1);
+ }
+
+ // Check cache for block. If found return.
+ if (blockCache != null) {
+ HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey);
+ if (cachedBlock != null) {
+ cacheHits++;
+
+ if (isCompaction) {
+ HRegion.incrNumericMetric(
+ this.compactionBlockReadCacheHitCntMetric, 1);
+ } else {
+ HRegion.incrNumericMetric(
+ this.fsBlockReadCacheHitCntMetric, 1);
+ }
+
+ return cachedBlock.getBufferWithoutHeader();
+ }
+ // Carry on, please load.
+ }
+
+ // Load block from filesystem.
+ long now = System.currentTimeMillis();
+ long nextOffset;
+
+ if (block == dataBlockIndexReader.getRootBlockCount() - 1) {
+ // last block! The end of data block is first meta block if there is
+ // one or if there isn't, the fileinfo offset.
+ nextOffset = (metaBlockIndexReader.getRootBlockCount() == 0) ?
+ this.trailer.getFileInfoOffset() :
+ metaBlockIndexReader.getRootBlockOffset(0);
+ } else {
+ nextOffset = dataBlockIndexReader.getRootBlockOffset(block + 1);
+ }
+
+ HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset
+ - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
+ hfileBlock.expectType(BlockType.DATA);
+ ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
+
+ long delta = System.currentTimeMillis() - now;
+ HFile.readTime += delta;
+ HFile.readOps++;
+ if (isCompaction) {
+ HRegion.incrTimeVaryingMetric(this.compactionReadTimeMetric, delta);
+ } else {
+ HRegion.incrTimeVaryingMetric(this.fsReadTimeMetric, delta);
+ }
+
+ // Cache the block
+ if (cacheBlock && blockCache != null) {
+ blockCache.cacheBlock(cacheKey, hfileBlock, inMemory);
+ }
+
+ return buf;
+ }
+ }
+
+ /**
+ * @return Last key in the file. May be null if file has no entries.
+ * Note that this is not the last rowkey, but rather the byte form of
+ * the last KeyValue.
+ */
+ public byte[] getLastKey() {
+ if (!fileInfoLoaded) {
+ throw new RuntimeException("Load file info first");
+ }
+ return dataBlockIndexReader.isEmpty() ? null : lastKey;
+ }
+
+ /**
+ * @return Midkey for this file. We work with block boundaries only so
+ * returned midkey is an approximation only.
+ *
+ * @throws IOException
+ */
+ @Override
+ public byte[] midkey() throws IOException {
+ if (!isFileInfoLoaded() || dataBlockIndexReader.isEmpty()) {
+ return null;
+ }
+ return dataBlockIndexReader.midkey();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (evictOnClose && this.blockCache != null) {
+ int numEvicted = 0;
+ for (int i = 0; i < dataBlockIndexReader.getRootBlockCount(); i++) {
+ if (blockCache.evictBlock(HFile.getBlockCacheKey(name,
+ dataBlockIndexReader.getRootBlockOffset(i))))
+ numEvicted++;
+ }
+ LOG.debug("On close of file " + name + " evicted " + numEvicted
+ + " block(s) of " + dataBlockIndexReader.getRootBlockCount()
+ + " total blocks");
+ }
+ if (this.closeIStream && this.istream != null) {
+ this.istream.close();
+ this.istream = null;
+ }
+ }
+
+ /**
+ * Implementation of {@link HFileScanner} interface.
+ */
+ protected static class ScannerV1 extends AbstractHFileReader.Scanner {
+ private final HFileReaderV1 reader;
+ private int currBlock;
+
+ public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
+ final boolean pread, final boolean isCompaction) {
+ this.reader = reader;
+ this.cacheBlocks = cacheBlocks;
+ this.pread = pread;
+ this.isCompaction = isCompaction;
+ }
+
+ @Override
+ public KeyValue getKeyValue() {
+ if (blockBuffer == null) {
+ return null;
+ }
+ return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ + blockBuffer.position() - 8);
+ }
+
+ @Override
+ public ByteBuffer getKey() {
+ if (blockBuffer == null || currKeyLen == 0) {
+ throw new RuntimeException(
+ "you need to seekTo() before calling getKey()");
+ }
+ ByteBuffer keyBuff = blockBuffer.slice();
+ keyBuff.limit(currKeyLen);
+ keyBuff.rewind();
+ // Do keyBuff.asReadOnly()?
+ return keyBuff;
+ }
+
+ @Override
+ public ByteBuffer getValue() {
+ if (blockBuffer == null || currKeyLen == 0) {
+ throw new RuntimeException(
+ "you need to seekTo() before calling getValue()");
+ }
+
+ // TODO: Could this be done with one ByteBuffer rather than create two?
+ ByteBuffer valueBuff = blockBuffer.slice();
+ valueBuff.position(currKeyLen);
+ valueBuff = valueBuff.slice();
+ valueBuff.limit(currValueLen);
+ valueBuff.rewind();
+ return valueBuff;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ // LOG.deug("rem:" + block.remaining() + " p:" + block.position() +
+ // " kl: " + currKeyLen + " kv: " + currValueLen);
+ if (blockBuffer == null) {
+ throw new IOException("Next called on non-seeked scanner");
+ }
+
+ try {
+ blockBuffer.position(blockBuffer.position() + currKeyLen
+ + currValueLen);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Current pos = " + blockBuffer.position() +
+ "; currKeyLen = " + currKeyLen +
+ "; currValLen = " + currValueLen +
+ "; block limit = " + blockBuffer.limit() +
+ "; HFile name = " + reader.getName() +
+ "; currBlock id = " + currBlock);
+ throw e;
+ }
+ if (blockBuffer.remaining() <= 0) {
+ // LOG.debug("Fetch next block");
+ currBlock++;
+ if (currBlock >= reader.getDataBlockIndexReader().getRootBlockCount()) {
+ // damn we are at the end
+ currBlock = 0;
+ blockBuffer = null;
+ return false;
+ }
+ blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
+ isCompaction);
+ currKeyLen = blockBuffer.getInt();
+ currValueLen = blockBuffer.getInt();
+ blockFetches++;
+ return true;
+ }
+
+ currKeyLen = blockBuffer.getInt();
+ currValueLen = blockBuffer.getInt();
+ return true;
+ }
+
+ @Override
+ public int seekTo(byte[] key) throws IOException {
+ return seekTo(key, 0, key.length);
+ }
+
+ @Override
+ public int seekTo(byte[] key, int offset, int length) throws IOException {
+ int b = reader.blockContainingKey(key, offset, length);
+ if (b < 0) return -1; // falls before the beginning of the file! :-(
+ // Avoid re-reading the same block (that'd be dumb).
+ loadBlock(b, true);
+ return blockSeek(key, offset, length, false);
+ }
+
+ @Override
+ public int reseekTo(byte[] key) throws IOException {
+ return reseekTo(key, 0, key.length);
+ }
+
+ @Override
+ public int reseekTo(byte[] key, int offset, int length)
+ throws IOException {
+ if (blockBuffer != null && currKeyLen != 0) {
+ ByteBuffer bb = getKey();
+ int compared = reader.getComparator().compare(key, offset,
+ length, bb.array(), bb.arrayOffset(), bb.limit());
+ if (compared < 1) {
+ // If the required key is less than or equal to current key, then
+ // don't do anything.
+ return compared;
+ }
+ }
+
+ int b = reader.blockContainingKey(key, offset, length);
+ if (b < 0) {
+ return -1;
+ }
+ loadBlock(b, false);
+ return blockSeek(key, offset, length, false);
+ }
+
+ /**
+ * Within a loaded block, seek looking for the first key
+ * that is smaller than (or equal to?) the key we are interested in.
+ *
+ * A note on the seekBefore - if you have seekBefore = true, AND the
+ * first key in the block = key, then you'll get thrown exceptions.
+ * @param key to find
+ * @param seekBefore find the key before the exact match.
+ * @return
+ */
+ private int blockSeek(byte[] key, int offset, int length,
+ boolean seekBefore) {
+ int klen, vlen;
+ int lastLen = 0;
+ do {
+ klen = blockBuffer.getInt();
+ vlen = blockBuffer.getInt();
+ int comp = reader.getComparator().compare(key, offset, length,
+ blockBuffer.array(),
+ blockBuffer.arrayOffset() + blockBuffer.position(), klen);
+ if (comp == 0) {
+ if (seekBefore) {
+ blockBuffer.position(blockBuffer.position() - lastLen - 16);
+ currKeyLen = blockBuffer.getInt();
+ currValueLen = blockBuffer.getInt();
+ return 1; // non exact match.
+ }
+ currKeyLen = klen;
+ currValueLen = vlen;
+ return 0; // indicate exact match
+ }
+ if (comp < 0) {
+ // go back one key:
+ blockBuffer.position(blockBuffer.position() - lastLen - 16);
+ currKeyLen = blockBuffer.getInt();
+ currValueLen = blockBuffer.getInt();
+ return 1;
+ }
+ blockBuffer.position(blockBuffer.position() + klen + vlen);
+ lastLen = klen + vlen;
+ } while (blockBuffer.remaining() > 0);
+
+ // ok we are at the end, so go back a littleeeeee....
+ // The 8 in the below is intentionally different to the 16s in the above
+ // Do the math you you'll figure it.
+ blockBuffer.position(blockBuffer.position() - lastLen - 8);
+ currKeyLen = blockBuffer.getInt();
+ currValueLen = blockBuffer.getInt();
+ return 1; // didn't exactly find it.
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key) throws IOException {
+ return seekBefore(key, 0, key.length);
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key, int offset, int length)
+ throws IOException {
+ int b = reader.blockContainingKey(key, offset, length);
+ if (b < 0)
+ return false; // key is before the start of the file.
+
+ // Question: does this block begin with 'key'?
+ byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
+ if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
+ key, offset, length) == 0) {
+ // Ok the key we're interested in is the first of the block, so go back
+ // by one.
+ if (b == 0) {
+ // we have a 'problem', the key we want is the first of the file.
+ return false;
+ }
+ b--;
+ // TODO shortcut: seek forward in this block to the last key of the
+ // block.
+ }
+ loadBlock(b, true);
+ blockSeek(key, offset, length, true);
+ return true;
+ }
+
+ @Override
+ public String getKeyString() {
+ return Bytes.toStringBinary(blockBuffer.array(),
+ blockBuffer.arrayOffset() + blockBuffer.position(), currKeyLen);
+ }
+
+ @Override
+ public String getValueString() {
+ return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset() +
+ blockBuffer.position() + currKeyLen, currValueLen);
+ }
+
+ @Override
+ public Reader getReader() {
+ return reader;
+ }
+
+ @Override
+ public boolean seekTo() throws IOException {
+ if (reader.getDataBlockIndexReader().isEmpty()) {
+ return false;
+ }
+ if (blockBuffer != null && currBlock == 0) {
+ blockBuffer.rewind();
+ currKeyLen = blockBuffer.getInt();
+ currValueLen = blockBuffer.getInt();
+ return true;
+ }
+ currBlock = 0;
+ blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
+ isCompaction);
+ currKeyLen = blockBuffer.getInt();
+ currValueLen = blockBuffer.getInt();
+ blockFetches++;
+ return true;
+ }
+
+ private void loadBlock(int bloc, boolean rewind) throws IOException {
+ if (blockBuffer == null) {
+ blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
+ isCompaction);
+ currBlock = bloc;
+ blockFetches++;
+ } else {
+ if (bloc != currBlock) {
+ blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
+ isCompaction);
+ currBlock = bloc;
+ blockFetches++;
+ } else {
+ // we are already in the same block, just rewind to seek again.
+ if (rewind) {
+ blockBuffer.rewind();
+ }
+ else {
+ // Go back by (size of rowlength + size of valuelength) = 8 bytes
+ blockBuffer.position(blockBuffer.position()-8);
+ }
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public HFileBlock readBlock(long offset, int onDiskBlockSize,
+ boolean cacheBlock, boolean pread, boolean isCompaction) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataInput getBloomFilterMetadata() throws IOException {
+ ByteBuffer buf = getMetaBlock(HFileWriterV1.BLOOM_FILTER_META_KEY, false);
+ if (buf == null)
+ return null;
+ ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),
+ buf.arrayOffset(), buf.limit());
+ return new DataInputStream(bais);
+ }
+
+ @Override
+ public boolean isFileInfoLoaded() {
+ return fileInfoLoaded;
+ }
+
+}
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,721 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.IdLock;
+
+/**
+ * {@link HFile} reader for version 2.
+ */
+public class HFileReaderV2 extends AbstractHFileReader implements
+ HFileBlock.BasicReader {
+
+ private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
+
+ /**
+ * The size of a (key length, value length) tuple that prefixes each entry in
+ * a data block.
+ */
+ private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
+
+ /**
+ * A "sparse lock" implementation allowing to lock on a particular block
+ * identified by offset. The purpose of this is to avoid two clients loading
+ * the same block, and have all but one client wait to get the block from the
+ * cache.
+ */
+ private IdLock offsetLock = new IdLock();
+
+ /**
+ * Blocks read from the load-on-open section, excluding data root index, meta
+ * index, and file info.
+ */
+ private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
+
+ /**
+ * Opens a HFile. You must load the index before you can use it by calling
+ * {@link #loadFileInfo()}.
+ *
+ * @param fsdis input stream. Caller is responsible for closing the passed
+ * stream.
+ * @param size Length of the stream.
+ * @param blockCache block cache. Pass null if none.
+ * @param inMemory whether blocks should be marked as in-memory in cache
+ * @param evictOnClose whether blocks in cache should be evicted on close
+ * @throws IOException
+ */
+ public HFileReaderV2(Path path, FixedFileTrailer trailer,
+ final FSDataInputStream fsdis, final long size,
+ final boolean closeIStream, final BlockCache blockCache,
+ final boolean inMemory, final boolean evictOnClose) throws IOException {
+ super(path, trailer, fsdis, size, closeIStream, blockCache, inMemory,
+ evictOnClose);
+
+ trailer.expectVersion(2);
+ fsBlockReader = new HFileBlock.FSReaderV2(fsdis, compressAlgo,
+ fileSize);
+
+ // Comparator class name is stored in the trailer in version 2.
+ comparator = trailer.createComparator();
+ dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
+ trailer.getNumDataIndexLevels(), this);
+ metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
+ Bytes.BYTES_RAWCOMPARATOR, 1);
+
+ // Parse load-on-open data.
+
+ HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange(
+ trailer.getLoadOnOpenDataOffset(),
+ fileSize - trailer.getTrailerSize());
+
+ // Data index. We also read statistics about the block index written after
+ // the root level.
+ dataBlockIndexReader.readMultiLevelIndexRoot(
+ blockIter.nextBlockAsStream(BlockType.ROOT_INDEX),
+ trailer.getDataIndexCount());
+
+ // Meta index.
+ metaBlockIndexReader.readRootIndex(
+ blockIter.nextBlockAsStream(BlockType.ROOT_INDEX),
+ trailer.getMetaIndexCount());
+
+ // File info
+ fileInfo = new FileInfo();
+ fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO));
+ lastKey = fileInfo.get(FileInfo.LASTKEY);
+ avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
+ avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
+
+ // Store all other load-on-open blocks for further consumption.
+ HFileBlock b;
+ while ((b = blockIter.nextBlock()) != null) {
+ loadOnOpenBlocks.add(b);
+ }
+ }
+
+ /**
+ * Create a Scanner on this file. No seeks or reads are done on creation. Call
+ * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+ * nothing to clean up in a Scanner. Letting go of your references to the
+ * scanner is sufficient.
+ *
+ * @param cacheBlocks True if we should cache blocks read in by this scanner.
+ * @param pread Use positional read rather than seek+read if true (pread is
+ * better for random reads, seek+read is better scanning).
+ * @param isCompaction is scanner being used for a compaction?
+ * @return Scanner on this file.
+ */
+ public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+ final boolean isCompaction) {
+ return new ScannerV2(this, cacheBlocks, pread, isCompaction);
+ }
+
+ /**
+ * @param metaBlockName
+ * @param cacheBlock Add block to cache, if found
+ * @return block wrapped in a ByteBuffer, with header skipped
+ * @throws IOException
+ */
+ public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
+ throws IOException {
+ if (trailer.getMetaIndexCount() == 0) {
+ return null; // there are no meta blocks
+ }
+ if (metaBlockIndexReader == null) {
+ throw new IOException("Meta index not loaded");
+ }
+
+ byte[] mbname = Bytes.toBytes(metaBlockName);
+ int block = metaBlockIndexReader.rootBlockContainingKey(mbname, 0,
+ mbname.length);
+ if (block == -1)
+ return null;
+ long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
+ long now = System.currentTimeMillis();
+
+ // Per meta key from any given file, synchronize reads for said block. This
+ // is OK to do for meta blocks because the meta block index is always
+ // single-level.
+ synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
+ metaLoads++;
+ HRegion.incrNumericMetric(fsMetaBlockReadCntMetric, 1);
+
+ // Check cache for block. If found return.
+ long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
+ String cacheKey = HFile.getBlockCacheKey(name, metaBlockOffset);
+
+ if (blockCache != null) {
+ HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey);
+ if (cachedBlock != null) {
+ // Return a distinct 'shallow copy' of the block,
+ // so pos does not get messed by the scanner
+ cacheHits++;
+ HRegion.incrNumericMetric(fsMetaBlockReadCacheHitCntMetric, 1);
+ return cachedBlock.getBufferWithoutHeader();
+ }
+ // Cache Miss, please load.
+ }
+
+ HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
+ blockSize, -1, true);
+
+ long delta = System.currentTimeMillis() - now;
+ HRegion.incrTimeVaryingMetric(fsReadTimeMetric, delta);
+ HFile.readTime += delta;
+ HFile.readOps++;
+
+ // Cache the block
+ if (cacheBlock && blockCache != null) {
+ blockCache.cacheBlock(cacheKey, metaBlock, inMemory);
+ }
+
+ return metaBlock.getBufferWithoutHeader();
+ }
+ }
+
+ @Override
+ public HFileBlock readBlockData(long offset, long onDiskSize,
+ int uncompressedSize, boolean pread) throws IOException {
+ if (onDiskSize >= Integer.MAX_VALUE) {
+ throw new IOException("Invalid on-disk size: " + onDiskSize);
+ }
+ return readBlock(offset, (int) onDiskSize, true, pread, false);
+ }
+
+ /**
+ * Read in a file block.
+ *
+ * @param dataBlockOffset offset to read.
+ * @param onDiskSize size of the block
+ * @param pread Use positional read instead of seek+read (positional is better
+ * doing random reads whereas seek+read is better scanning).
+ * @param isCompaction is this block being read as part of a compaction
+ * @return Block wrapped in a ByteBuffer.
+ * @throws IOException
+ */
+ public HFileBlock readBlock(long dataBlockOffset, int onDiskBlockSize,
+ boolean cacheBlock, final boolean pread, final boolean isCompaction)
+ throws IOException {
+ if (dataBlockIndexReader == null) {
+ throw new IOException("Block index not loaded");
+ }
+ if (dataBlockOffset < 0
+ || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
+ throw new IOException("Requested block is out of range: "
+ + dataBlockOffset + ", lastDataBlockOffset: "
+ + trailer.getLastDataBlockOffset());
+ }
+ // For any given block from any given file, synchronize reads for said
+ // block.
+ // Without a cache, this synchronizing is needless overhead, but really
+ // the other choice is to duplicate work (which the cache would prevent you
+ // from doing).
+
+ String cacheKey = HFile.getBlockCacheKey(name, dataBlockOffset);
+ IdLock.Entry lockEntry = offsetLock.getLockEntry(dataBlockOffset);
+ try {
+ blockLoads++;
+
+ if (isCompaction) {
+ HRegion.incrNumericMetric(compactionBlockReadCntMetric, 1);
+ } else {
+ HRegion.incrNumericMetric(fsBlockReadCntMetric, 1);
+ }
+
+ // Check cache for block. If found return.
+ if (blockCache != null) {
+ HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey);
+ if (cachedBlock != null) {
+ cacheHits++;
+
+ if (isCompaction) {
+ HRegion.incrNumericMetric(
+ compactionBlockReadCacheHitCntMetric, 1);
+ } else {
+ HRegion.incrNumericMetric(fsBlockReadCacheHitCntMetric, 1);
+ }
+ return cachedBlock;
+ }
+ // Carry on, please load.
+ }
+
+ // Load block from filesystem.
+ long now = System.currentTimeMillis();
+ HFileBlock dataBlock = fsBlockReader.readBlockData(dataBlockOffset,
+ onDiskBlockSize, -1, true);
+
+ long delta = System.currentTimeMillis() - now;
+ HFile.readTime += delta;
+ HFile.readOps++;
+ if (isCompaction) {
+ HRegion.incrTimeVaryingMetric(compactionReadTimeMetric, delta);
+ } else {
+ HRegion.incrTimeVaryingMetric(fsReadTimeMetric, delta);
+ }
+
+ // Cache the block
+ if (cacheBlock && blockCache != null) {
+ blockCache.cacheBlock(cacheKey, dataBlock, inMemory);
+ }
+
+ return dataBlock;
+ } finally {
+ offsetLock.releaseLockEntry(lockEntry);
+ }
+ }
+
+ /**
+ * @return Last key in the file. May be null if file has no entries. Note that
+ * this is not the last row key, but rather the byte form of the last
+ * KeyValue.
+ */
+ @Override
+ public byte[] getLastKey() {
+ return dataBlockIndexReader.isEmpty() ? null : lastKey;
+ }
+
+ /**
+ * @return Midkey for this file. We work with block boundaries only so
+ * returned midkey is an approximation only.
+ * @throws IOException
+ */
+ @Override
+ public byte[] midkey() throws IOException {
+ return dataBlockIndexReader.midkey();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (evictOnClose && blockCache != null) {
+ int numEvicted = blockCache.evictBlocksByPrefix(name
+ + HFile.CACHE_KEY_SEPARATOR);
+ LOG.debug("On close of file " + name + " evicted " + numEvicted
+ + " block(s)");
+ }
+ if (closeIStream && istream != null) {
+ istream.close();
+ istream = null;
+ }
+ }
+
+ /**
+ * Implementation of {@link HFileScanner} interface.
+ */
+ protected static class ScannerV2 extends AbstractHFileReader.Scanner {
+ private HFileBlock block;
+
+ public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
+ final boolean pread, final boolean isCompaction) {
+ this.reader = r;
+ this.cacheBlocks = cacheBlocks;
+ this.pread = pread;
+ this.isCompaction = isCompaction;
+ }
+
+ @Override
+ public KeyValue getKeyValue() {
+ if (!isSeeked())
+ return null;
+
+ return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ + blockBuffer.position());
+ }
+
+ @Override
+ public ByteBuffer getKey() {
+ assertSeeked();
+ return ByteBuffer.wrap(
+ blockBuffer.array(),
+ blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
+ }
+
+ @Override
+ public ByteBuffer getValue() {
+ assertSeeked();
+ return ByteBuffer.wrap(
+ blockBuffer.array(),
+ blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
+ }
+
+ private void setNonSeekedState() {
+ block = null;
+ blockBuffer = null;
+ currKeyLen = 0;
+ currValueLen = 0;
+ }
+
+ /**
+ * Go to the next key/value in the block section. Loads the next block if
+ * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
+ * be called.
+ *
+ * @return true if successfully navigated to the next key/value
+ */
+ @Override
+ public boolean next() throws IOException {
+ assertSeeked();
+
+ try {
+ blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE
+ + currKeyLen + currValueLen);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Current pos = " + blockBuffer.position()
+ + "; currKeyLen = " + currKeyLen + "; currValLen = "
+ + currValueLen + "; block limit = " + blockBuffer.limit()
+ + "; HFile name = " + reader.getName()
+ + "; currBlock currBlockOffset = " + block.getOffset());
+ throw e;
+ }
+
+ if (blockBuffer.remaining() <= 0) {
+ long lastDataBlockOffset =
+ reader.getTrailer().getLastDataBlockOffset();
+
+ if (block.getOffset() >= lastDataBlockOffset) {
+ setNonSeekedState();
+ return false;
+ }
+
+ // read the next block
+ HFileBlock nextBlock = readNextDataBlock();
+ if (nextBlock == null) {
+ setNonSeekedState();
+ return false;
+ }
+
+ updateCurrBlock(nextBlock);
+ return true;
+ }
+
+ // We are still in the same block.
+ readKeyValueLen();
+ return true;
+ }
+
+ /**
+ * Scans blocks in the "scanned" section of the {@link HFile} until the next
+ * data block is found.
+ *
+ * @return the next block, or null if there are no more data blocks
+ * @throws IOException
+ */
+ private HFileBlock readNextDataBlock() throws IOException {
+ long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
+ if (block == null)
+ return null;
+
+ HFileBlock curBlock = block;
+
+ do {
+ if (curBlock.getOffset() >= lastDataBlockOffset)
+ return null;
+
+ if (curBlock.getOffset() < 0) {
+ throw new IOException("Invalid block file offset: " + block);
+ }
+ curBlock = reader.readBlock(curBlock.getOffset()
+ + curBlock.getOnDiskSizeWithHeader(),
+ curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
+ false);
+ } while (!curBlock.getBlockType().equals(BlockType.DATA));
+
+ return curBlock;
+ }
+
+ /**
+ * Positions this scanner at the start of the file.
+ *
+ * @return false if empty file; i.e. a call to next would return false and
+ * the current key and value are undefined.
+ * @throws IOException
+ */
+ @Override
+ public boolean seekTo() throws IOException {
+ if (reader == null) {
+ return false;
+ }
+
+ if (reader.getTrailer().getEntryCount() == 0) {
+ // No data blocks.
+ return false;
+ }
+
+ long firstDataBlockOffset =
+ reader.getTrailer().getFirstDataBlockOffset();
+ if (block != null && block.getOffset() == firstDataBlockOffset) {
+ blockBuffer.rewind();
+ readKeyValueLen();
+ return true;
+ }
+
+ block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
+ isCompaction);
+ if (block.getOffset() < 0) {
+ throw new IOException("Invalid block offset: " + block.getOffset());
+ }
+ updateCurrBlock(block);
+ return true;
+ }
+
+ @Override
+ public int seekTo(byte[] key) throws IOException {
+ return seekTo(key, 0, key.length);
+ }
+
+ @Override
+ public int seekTo(byte[] key, int offset, int length) throws IOException {
+ HFileBlock seekToBlock =
+ ((HFileReaderV2) reader).getDataBlockIndexReader().seekToDataBlock(
+ key, offset, length, block);
+ if (seekToBlock == null) {
+ // This happens if the key e.g. falls before the beginning of the file.
+ return -1;
+ }
+ return loadBlockAndSeekToKey(seekToBlock, true, key, offset, length,
+ false);
+ }
+
+ @Override
+ public int reseekTo(byte[] key) throws IOException {
+ return reseekTo(key, 0, key.length);
+ }
+
+ @Override
+ public int reseekTo(byte[] key, int offset, int length) throws IOException {
+ if (isSeeked()) {
+ ByteBuffer bb = getKey();
+ int compared = reader.getComparator().compare(key, offset,
+ length, bb.array(), bb.arrayOffset(), bb.limit());
+ if (compared < 1) {
+ // If the required key is less than or equal to current key, then
+ // don't do anything.
+ return compared;
+ }
+ }
+ return seekTo(key, offset, length);
+ }
+
+ private int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
+ byte[] key, int offset, int length, boolean seekBefore)
+ throws IOException {
+ if (block == null || block.getOffset() != seekToBlock.getOffset()) {
+ updateCurrBlock(seekToBlock);
+ } else if (rewind) {
+ blockBuffer.rewind();
+ }
+ return blockSeek(key, offset, length, seekBefore);
+ }
+
+ /**
+ * Updates the current block to be the given {@link HFileBlock}. Seeks to
+ * the the first key/value pair.
+ *
+ * @param newBlock the block to make current
+ */
+ private void updateCurrBlock(HFileBlock newBlock) {
+ block = newBlock;
+ blockBuffer = block.getBufferWithoutHeader();
+ readKeyValueLen();
+ blockFetches++;
+ }
+
+ private final void readKeyValueLen() {
+ blockBuffer.mark();
+ currKeyLen = blockBuffer.getInt();
+ currValueLen = blockBuffer.getInt();
+ blockBuffer.reset();
+
+ if (currKeyLen < 0 || currValueLen < 0
+ || currKeyLen > blockBuffer.limit()
+ || currValueLen > blockBuffer.limit()) {
+ throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
+ + " or currValueLen " + currValueLen + ". Block offset: "
+ + block.getOffset() + ", block length: " + blockBuffer.limit()
+ + ", position: " + blockBuffer.position() + " (without header).");
+ }
+ }
+
+ /**
+ * Within a loaded block, seek looking for the first key that is smaller
+ * than (or equal to?) the key we are interested in.
+ *
+ * A note on the seekBefore: if you have seekBefore = true, AND the first
+ * key in the block = key, then you'll get thrown exceptions. The caller has
+ * to check for that case and load the previous block as appropriate.
+ *
+ * @param key the key to find
+ * @param seekBefore find the key before the given key in case of exact
+ * match.
+ * @return 0 in case of an exact key match, 1 in case of an inexact match
+ */
+ private int blockSeek(byte[] key, int offset, int length,
+ boolean seekBefore) {
+ int klen, vlen;
+ int lastKeyValueSize = -1;
+ do {
+ blockBuffer.mark();
+ klen = blockBuffer.getInt();
+ vlen = blockBuffer.getInt();
+ blockBuffer.reset();
+
+ int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE;
+ int comp = reader.getComparator().compare(key, offset, length,
+ blockBuffer.array(), keyOffset, klen);
+
+ if (comp == 0) {
+ if (seekBefore) {
+ if (lastKeyValueSize < 0) {
+ throw new IllegalStateException("blockSeek with seekBefore "
+ + "at the first key of the block: key="
+ + Bytes.toStringBinary(key) + ", blockOffset="
+ + block.getOffset() + ", onDiskSize="
+ + block.getOnDiskSizeWithHeader());
+ }
+ blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+ readKeyValueLen();
+ return 1; // non exact match.
+ }
+ currKeyLen = klen;
+ currValueLen = vlen;
+ return 0; // indicate exact match
+ }
+
+ if (comp < 0) {
+ if (lastKeyValueSize > 0)
+ blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+ readKeyValueLen();
+ return 1;
+ }
+
+ // The size of this key/value tuple, including key/value length fields.
+ lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE;
+ blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
+ } while (blockBuffer.remaining() > 0);
+
+ // Seek to the last key we successfully read. This will happen if this is
+ // the last key/value pair in the file, in which case the following call
+ // to next() has to return false.
+ blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+ readKeyValueLen();
+ return 1; // didn't exactly find it.
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key) throws IOException {
+ return seekBefore(key, 0, key.length);
+ }
+
+ private ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+ ByteBuffer buffer = curBlock.getBufferWithoutHeader();
+ // It is safe to manipulate this buffer because we own the buffer object.
+ buffer.rewind();
+ int klen = buffer.getInt();
+ buffer.getInt();
+ ByteBuffer keyBuff = buffer.slice();
+ keyBuff.limit(klen);
+ keyBuff.rewind();
+ return keyBuff;
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key, int offset, int length)
+ throws IOException {
+ HFileReaderV2 reader2 = (HFileReaderV2) reader;
+ HFileBlock seekToBlock =
+ reader2.getDataBlockIndexReader().seekToDataBlock(
+ key, offset, length, block);
+ if (seekToBlock == null) {
+ return false;
+ }
+ ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
+ if (reader.getComparator().compare(firstKey.array(),
+ firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
+ {
+ long previousBlockOffset = seekToBlock.getPrevBlockOffset();
+ // The key we are interested in
+ if (previousBlockOffset == -1) {
+ // we have a 'problem', the key we want is the first of the file.
+ return false;
+ }
+
+ // It is important that we compute and pass onDiskSize to the block
+ // reader so that it does not have to read the header separately to
+ // figure out the size.
+ seekToBlock = reader2.fsBlockReader.readBlockData(previousBlockOffset,
+ seekToBlock.getOffset() - previousBlockOffset, -1, pread);
+
+ // TODO shortcut: seek forward in this block to the last key of the
+ // block.
+ }
+ loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
+ return true;
+ }
+
+ @Override
+ public String getKeyString() {
+ return Bytes.toStringBinary(blockBuffer.array(),
+ blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE, currKeyLen);
+ }
+
+ @Override
+ public String getValueString() {
+ return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
+ + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
+ currValueLen);
+ }
+
+ }
+
+ /**
+ * Returns a buffer with the Bloom filter metadata. The caller takes
+ * ownership of the buffer.
+ */
+ @Override
+ public DataInput getBloomFilterMetadata() throws IOException {
+ for (HFileBlock b : loadOnOpenBlocks)
+ if (b.getBlockType() == BlockType.BLOOM_META)
+ return b.getByteStream();
+ return null;
+ }
+
+ @Override
+ public boolean isFileInfoLoaded() {
+ return true; // We load file info in constructor in version 2.
+ }
+
+}
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,484 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.util.BloomFilterWriter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * Writes version 1 HFiles. Mainly used for testing backwards-compatibilty.
+ */
+public class HFileWriterV1 extends AbstractHFileWriter {
+
+ /** Meta data block name for bloom filter parameters. */
+ static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
+
+ /** Meta data block name for bloom filter bits. */
+ public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
+
+ private static final Log LOG = LogFactory.getLog(HFileWriterV1.class);
+
+ // A stream made per block written.
+ private DataOutputStream out;
+
+ // Offset where the current block began.
+ private long blockBegin;
+
+ // First keys of every block.
+ private ArrayList<byte[]> blockKeys = new ArrayList<byte[]>();
+
+ // Block offset in backing stream.
+ private ArrayList<Long> blockOffsets = new ArrayList<Long>();
+
+ // Raw (decompressed) data size.
+ private ArrayList<Integer> blockDataSizes = new ArrayList<Integer>();
+
+ private Compressor compressor;
+
+ // Additional byte array output stream used to fill block cache
+ private ByteArrayOutputStream baos;
+ private DataOutputStream baosDos;
+ private int blockNumber = 0;
+
+ static class WriterFactoryV1 extends HFile.WriterFactory {
+
+ WriterFactoryV1(Configuration conf) { super(conf); }
+
+ @Override
+ public Writer createWriter(FileSystem fs, Path path) throws IOException {
+ return new HFileWriterV1(conf, fs, path);
+ }
+
+ @Override
+ public Writer createWriter(FileSystem fs, Path path, int blockSize,
+ int bytesPerChecksum, Compression.Algorithm compressAlgo,
+ final KeyComparator comparator)
+ throws IOException {
+ return new HFileWriterV1(conf, fs, path, blockSize, bytesPerChecksum,
+ compressAlgo, comparator);
+ }
+
+ @Override
+ public Writer createWriter(FileSystem fs, Path path, int blockSize,
+ int bytesPerChecksum, String compressAlgoName,
+ final KeyComparator comparator) throws IOException {
+ return new HFileWriterV1(conf, fs, path, blockSize, bytesPerChecksum,
+ compressAlgoName, comparator);
+ }
+
+ @Override
+ public Writer createWriter(final FSDataOutputStream ostream,
+ final int blockSize, final String compress,
+ final KeyComparator comparator) throws IOException {
+ return new HFileWriterV1(conf, ostream, blockSize, compress, comparator);
+ }
+
+ @Override
+ public Writer createWriter(final FSDataOutputStream ostream,
+ final int blockSize, final Compression.Algorithm compress,
+ final KeyComparator c) throws IOException {
+ return new HFileWriterV1(conf, ostream, blockSize, compress, c);
+ }
+ }
+
+ /** Constructor that uses all defaults for compression and block size. */
+ public HFileWriterV1(Configuration conf, FileSystem fs, Path path)
+ throws IOException {
+ this(conf, fs, path, HFile.DEFAULT_BLOCKSIZE,
+ HFile.DEFAULT_BYTES_PER_CHECKSUM, HFile.DEFAULT_COMPRESSION_ALGORITHM,
+ null);
+ }
+
+ /**
+ * Constructor that takes a path, creates and closes the output stream. Takes
+ * compression algorithm name as string.
+ */
+ public HFileWriterV1(Configuration conf, FileSystem fs, Path path,
+ int blockSize, int bytesPerChecksum, String compressAlgoName,
+ final KeyComparator comparator) throws IOException {
+ this(conf, fs, path, blockSize, bytesPerChecksum,
+ compressionByName(compressAlgoName), comparator);
+ }
+
+ /** Constructor that takes a path, creates and closes the output stream. */
+ public HFileWriterV1(Configuration conf, FileSystem fs, Path path,
+ int blockSize, int bytesPerChecksum, Compression.Algorithm compress,
+ final KeyComparator comparator) throws IOException {
+ super(conf, createOutputStream(conf, fs, path, bytesPerChecksum), path,
+ blockSize, compress, comparator);
+ }
+
+ /** Constructor that takes a stream. */
+ public HFileWriterV1(Configuration conf,
+ final FSDataOutputStream outputStream, final int blockSize,
+ final String compressAlgoName, final KeyComparator comparator)
+ throws IOException {
+ this(conf, outputStream, blockSize,
+ Compression.getCompressionAlgorithmByName(compressAlgoName),
+ comparator);
+ }
+
+ /** Constructor that takes a stream. */
+ public HFileWriterV1(Configuration conf,
+ final FSDataOutputStream outputStream, final int blockSize,
+ final Compression.Algorithm compress, final KeyComparator comparator)
+ throws IOException {
+ super(conf, outputStream, null, blockSize, compress, comparator);
+ }
+
+ /**
+ * If at block boundary, opens new block.
+ *
+ * @throws IOException
+ */
+ private void checkBlockBoundary() throws IOException {
+ if (this.out != null && this.out.size() < blockSize)
+ return;
+ finishBlock();
+ newBlock();
+ }
+
+ /**
+ * Do the cleanup if a current block.
+ *
+ * @throws IOException
+ */
+ private void finishBlock() throws IOException {
+ if (this.out == null)
+ return;
+ long now = System.currentTimeMillis();
+
+ int size = releaseCompressingStream(this.out);
+ this.out = null;
+ blockKeys.add(firstKeyInBlock);
+ blockOffsets.add(Long.valueOf(blockBegin));
+ blockDataSizes.add(Integer.valueOf(size));
+ this.totalUncompressedBytes += size;
+
+ HFile.writeTime += System.currentTimeMillis() - now;
+ HFile.writeOps++;
+
+ if (cacheDataBlocksOnWrite) {
+ baosDos.flush();
+ byte[] bytes = baos.toByteArray();
+ blockCache.cacheBlock(HFile.getBlockCacheKey(name, blockBegin),
+ new HFileBlock(BlockType.DATA,
+ (int) (outputStream.getPos() - blockBegin), bytes.length, -1,
+ ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin));
+ baosDos.close();
+ }
+ blockNumber++;
+ }
+
+ /**
+ * Ready a new block for writing.
+ *
+ * @throws IOException
+ */
+ private void newBlock() throws IOException {
+ // This is where the next block begins.
+ blockBegin = outputStream.getPos();
+ this.out = getCompressingStream();
+ BlockType.DATA.write(out);
+ firstKeyInBlock = null;
+ if (cacheDataBlocksOnWrite) {
+ this.baos = new ByteArrayOutputStream();
+ this.baosDos = new DataOutputStream(baos);
+ baosDos.write(HFileBlock.DUMMY_HEADER);
+ }
+ }
+
+ /**
+ * Sets up a compressor and creates a compression stream on top of
+ * this.outputStream. Get one per block written.
+ *
+ * @return A compressing stream; if 'none' compression, returned stream does
+ * not compress.
+ *
+ * @throws IOException
+ *
+ * @see {@link #releaseCompressingStream(DataOutputStream)}
+ */
+ private DataOutputStream getCompressingStream() throws IOException {
+ this.compressor = compressAlgo.getCompressor();
+ // Get new DOS compression stream. In tfile, the DOS, is not closed,
+ // just finished, and that seems to be fine over there. TODO: Check
+ // no memory retention of the DOS. Should I disable the 'flush' on the
+ // DOS as the BCFile over in tfile does? It wants to make it so flushes
+ // don't go through to the underlying compressed stream. Flush on the
+ // compressed downstream should be only when done. I was going to but
+ // looks like when we call flush in here, its legitimate flush that
+ // should go through to the compressor.
+ OutputStream os = this.compressAlgo.createCompressionStream(
+ this.outputStream, this.compressor, 0);
+ return new DataOutputStream(os);
+ }
+
+ /**
+ * Let go of block compressor and compressing stream gotten in call {@link
+ * #getCompressingStream}.
+ *
+ * @param dos
+ *
+ * @return How much was written on this stream since it was taken out.
+ *
+ * @see #getCompressingStream()
+ *
+ * @throws IOException
+ */
+ private int releaseCompressingStream(final DataOutputStream dos)
+ throws IOException {
+ dos.flush();
+ this.compressAlgo.returnCompressor(this.compressor);
+ this.compressor = null;
+ return dos.size();
+ }
+
+ /**
+ * Add a meta block to the end of the file. Call before close(). Metadata
+ * blocks are expensive. Fill one with a bunch of serialized data rather than
+ * do a metadata block per metadata instance. If metadata is small, consider
+ * adding to file info using {@link #appendFileInfo(byte[], byte[])}
+ *
+ * @param metaBlockName
+ * name of the block
+ * @param content
+ * will call readFields to get data later (DO NOT REUSE)
+ */
+ public void appendMetaBlock(String metaBlockName, Writable content) {
+ byte[] key = Bytes.toBytes(metaBlockName);
+ int i;
+ for (i = 0; i < metaNames.size(); ++i) {
+ // stop when the current key is greater than our own
+ byte[] cur = metaNames.get(i);
+ if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
+ key.length) > 0) {
+ break;
+ }
+ }
+ metaNames.add(i, key);
+ metaData.add(i, content);
+ }
+
+ /**
+ * Add key/value to file. Keys must be added in an order that agrees with the
+ * Comparator passed on construction.
+ *
+ * @param kv
+ * KeyValue to add. Cannot be empty nor null.
+ * @throws IOException
+ */
+ public void append(final KeyValue kv) throws IOException {
+ append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
+ kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+ }
+
+ /**
+ * Add key/value to file. Keys must be added in an order that agrees with the
+ * Comparator passed on construction.
+ *
+ * @param key
+ * Key to add. Cannot be empty nor null.
+ * @param value
+ * Value to add. Cannot be empty nor null.
+ * @throws IOException
+ */
+ public void append(final byte[] key, final byte[] value) throws IOException {
+ append(key, 0, key.length, value, 0, value.length);
+ }
+
+ /**
+ * Add key/value to file. Keys must be added in an order that agrees with the
+ * Comparator passed on construction.
+ *
+ * @param key
+ * @param koffset
+ * @param klength
+ * @param value
+ * @param voffset
+ * @param vlength
+ * @throws IOException
+ */
+ private void append(final byte[] key, final int koffset, final int klength,
+ final byte[] value, final int voffset, final int vlength)
+ throws IOException {
+ boolean dupKey = checkKey(key, koffset, klength);
+ checkValue(value, voffset, vlength);
+ if (!dupKey) {
+ checkBlockBoundary();
+ }
+ // Write length of key and value and then actual key and value bytes.
+ this.out.writeInt(klength);
+ totalKeyLength += klength;
+ this.out.writeInt(vlength);
+ totalValueLength += vlength;
+ this.out.write(key, koffset, klength);
+ this.out.write(value, voffset, vlength);
+ // Are we the first key in this block?
+ if (this.firstKeyInBlock == null) {
+ // Copy the key.
+ this.firstKeyInBlock = new byte[klength];
+ System.arraycopy(key, koffset, this.firstKeyInBlock, 0, klength);
+ }
+ this.lastKeyBuffer = key;
+ this.lastKeyOffset = koffset;
+ this.lastKeyLength = klength;
+ this.entryCount++;
+ // If we are pre-caching blocks on write, fill byte array stream
+ if (cacheDataBlocksOnWrite) {
+ this.baosDos.writeInt(klength);
+ this.baosDos.writeInt(vlength);
+ this.baosDos.write(key, koffset, klength);
+ this.baosDos.write(value, voffset, vlength);
+ }
+ }
+
+ public void close() throws IOException {
+ if (this.outputStream == null) {
+ return;
+ }
+ // Write out the end of the data blocks, then write meta data blocks.
+ // followed by fileinfo, data block index and meta block index.
+
+ finishBlock();
+
+ FixedFileTrailer trailer = new FixedFileTrailer(1);
+
+ // Write out the metadata blocks if any.
+ ArrayList<Long> metaOffsets = null;
+ ArrayList<Integer> metaDataSizes = null;
+ if (metaNames.size() > 0) {
+ metaOffsets = new ArrayList<Long>(metaNames.size());
+ metaDataSizes = new ArrayList<Integer>(metaNames.size());
+ for (int i = 0; i < metaNames.size(); ++i) {
+ // store the beginning offset
+ long curPos = outputStream.getPos();
+ metaOffsets.add(curPos);
+ // write the metadata content
+ DataOutputStream dos = getCompressingStream();
+ BlockType.META.write(dos);
+ metaData.get(i).write(dos);
+ int size = releaseCompressingStream(dos);
+ // store the metadata size
+ metaDataSizes.add(size);
+ }
+ }
+
+ writeFileInfo(trailer, outputStream);
+
+ // Write the data block index.
+ trailer.setLoadOnOpenOffset(writeBlockIndex(this.outputStream,
+ this.blockKeys, this.blockOffsets, this.blockDataSizes));
+ LOG.info("Wrote a version 1 block index with " + this.blockKeys.size()
+ + " keys");
+
+ if (metaNames.size() > 0) {
+ // Write the meta index.
+ writeBlockIndex(this.outputStream, metaNames, metaOffsets, metaDataSizes);
+ }
+
+ // Now finish off the trailer.
+ trailer.setDataIndexCount(blockKeys.size());
+
+ finishClose(trailer);
+ }
+
+ @Override
+ protected void finishFileInfo() throws IOException {
+ super.finishFileInfo();
+
+ // In version 1, we store comparator name in the file info.
+ fileInfo.append(FileInfo.COMPARATOR,
+ Bytes.toBytes(comparator.getClass().getName()), false);
+ }
+
+ @Override
+ public void addInlineBlockWriter(InlineBlockWriter bloomWriter) {
+ // Inline blocks only exist in HFile format version 2.
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Version 1 Bloom filters are stored in two meta blocks with two different
+ * keys.
+ */
+ @Override
+ public void addBloomFilter(BloomFilterWriter bfw) {
+ appendMetaBlock(BLOOM_FILTER_META_KEY,
+ bfw.getMetaWriter());
+ Writable dataWriter = bfw.getDataWriter();
+ if (dataWriter != null) {
+ appendMetaBlock(BLOOM_FILTER_DATA_KEY, dataWriter);
+ }
+ }
+
+ /**
+ * Write out the index in the version 1 format. This conforms to the legacy
+ * version 1 format, but can still be read by
+ * {@link HFileBlockIndex.BlockIndexReader#readRootIndex(java.io.DataInputStream,
+ * int)}.
+ *
+ * @param out the stream to write to
+ * @param keys
+ * @param offsets
+ * @param uncompressedSizes in contrast with a version 2 root index format,
+ * the sizes stored in the version 1 are uncompressed sizes
+ * @return
+ * @throws IOException
+ */
+ private static long writeBlockIndex(final FSDataOutputStream out,
+ final List<byte[]> keys, final List<Long> offsets,
+ final List<Integer> uncompressedSizes) throws IOException {
+ long pos = out.getPos();
+ // Don't write an index if nothing in the index.
+ if (keys.size() > 0) {
+ BlockType.INDEX_V1.write(out);
+ // Write the index.
+ for (int i = 0; i < keys.size(); ++i) {
+ out.writeLong(offsets.get(i).longValue());
+ out.writeInt(uncompressedSizes.get(i).intValue());
+ byte[] key = keys.get(i);
+ Bytes.writeByteArray(out, key);
+ }
+ }
+ return pos;
+ }
+
+}
\ No newline at end of file