You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/07/17 09:17:28 UTC
svn commit: r677517 [5/6] - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/ipc/
src/java/org/apache/hadoop/hbase/master/
src/java/org/apache/hadoop/hbase/regionserv...
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreFile.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreFile.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreFile.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,1032 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.migration.v5;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.BlockFSInputStream;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.onelab.filter.BloomFilter;
+import org.onelab.filter.Key;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+
+/**
+ * A HStore data file. HStores usually have one or more of these files. They
+ * are produced by flushing the memcache to disk.
+ *
+ * <p>Each HStore maintains a bunch of different data files. The filename is a
+ * mix of the parent dir, the region name, the column name, and a file
+ * identifier. The name may also be a reference to a store file located
+ * elsewhere. This class handles all that path-building stuff for you.
+ *
+ * <p>An HStoreFile usually tracks 4 things: its parent dir, the region
+ * identifier, the column family, and the file identifier. If you know those
+ * four things, you know how to obtain the right HStoreFile. HStoreFiles may
+ * also refernce store files in another region serving either from
+ * the top-half of the remote file or from the bottom-half. Such references
+ * are made fast splitting regions.
+ *
+ * <p>Plain HStoreFiles are named for a randomly generated id as in:
+ * <code>1278437856009925445</code> A file by this name is made in both the
+ * <code>mapfiles</code> and <code>info</code> subdirectories of a
+ * HStore columnfamily directoy: E.g. If the column family is 'anchor:', then
+ * under the region directory there is a subdirectory named 'anchor' within
+ * which is a 'mapfiles' and 'info' subdirectory. In each will be found a
+ * file named something like <code>1278437856009925445</code>, one to hold the
+ * data in 'mapfiles' and one under 'info' that holds the sequence id for this
+ * store file.
+ *
+ * <p>References to store files located over in some other region look like
+ * this:
+ * <code>1278437856009925445.hbaserepository,qAReLZD-OyQORZWq_vqR1k==,959247014679548184</code>:
+ * i.e. an id followed by the name of the referenced region. The data
+ * ('mapfiles') of HStoreFile references are empty. The accompanying
+ * <code>info</code> file contains the
+ * midkey, the id of the remote store we're referencing and whether we're
+ * to serve the top or bottom region of the remote store file. Note, a region
+ * is not splitable if it has instances of store file references (References
+ * are cleaned up by compactions).
+ *
+ * <p>When merging or splitting HRegions, we might want to modify one of the
+ * params for an HStoreFile (effectively moving it elsewhere).
+ */
+public class HStoreFile implements HConstants {
+ static final Log LOG = LogFactory.getLog(HStoreFile.class.getName());
+ static final byte INFO_SEQ_NUM = 0;
+ static final String HSTORE_DATFILE_DIR = "mapfiles";
+ static final String HSTORE_INFO_DIR = "info";
+ static final String HSTORE_FILTER_DIR = "filter";
+
+ /**
+ * For split HStoreFiles, specifies if the file covers the lower half or
+ * the upper half of the key range
+ */
+ public static enum Range {
+ /** HStoreFile contains upper half of key range */
+ top,
+ /** HStoreFile contains lower half of key range */
+ bottom
+ }
+
+ private final static Random rand = new Random();
+
+ private final Path basedir;
+ private final int encodedRegionName;
+ private final byte [] colFamily;
+ private final long fileId;
+ private final HBaseConfiguration conf;
+ private final FileSystem fs;
+ private final Reference reference;
+
+ /**
+ * Constructor that fully initializes the object
+ * @param conf Configuration object
+ * @param basedir qualified path that is parent of region directory
+ * @param encodedRegionName file name friendly name of the region
+ * @param colFamily name of the column family
+ * @param fileId file identifier
+ * @param ref Reference to another HStoreFile.
+ * @throws IOException
+ */
+ HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir,
+ int encodedRegionName, byte [] colFamily, long fileId,
+ final Reference ref) throws IOException {
+ this.conf = conf;
+ this.fs = fs;
+ this.basedir = basedir;
+ this.encodedRegionName = encodedRegionName;
+ this.colFamily = colFamily;
+
+ long id = fileId;
+ if (id == -1) {
+ Path mapdir = HStoreFile.getMapDir(basedir, encodedRegionName, colFamily);
+ Path testpath = null;
+ do {
+ id = Math.abs(rand.nextLong());
+ testpath = new Path(mapdir, createHStoreFilename(id, -1));
+ } while(fs.exists(testpath));
+ }
+ this.fileId = id;
+
+ // If a reference, construction does not write the pointer files. Thats
+ // done by invocations of writeReferenceFiles(hsf, fs). Happens at fast
+ // split time.
+ this.reference = ref;
+ }
+
+ /** @return the region name */
+ boolean isReference() {
+ return reference != null;
+ }
+
+ Reference getReference() {
+ return reference;
+ }
+
+ int getEncodedRegionName() {
+ return this.encodedRegionName;
+ }
+
+ /** @return the column family */
+ byte [] getColFamily() {
+ return colFamily;
+ }
+
+ /** @return the file identifier */
+ long getFileId() {
+ return fileId;
+ }
+
+ // Build full filenames from those components
+
+ /** @return path for MapFile */
+ Path getMapFilePath() {
+ if (isReference()) {
+ return getMapFilePath(encodedRegionName, fileId,
+ reference.getEncodedRegionName());
+ }
+ return getMapFilePath(this.encodedRegionName, fileId);
+ }
+
+ private Path getMapFilePath(final Reference r) {
+ if (r == null) {
+ return getMapFilePath();
+ }
+ return getMapFilePath(r.getEncodedRegionName(), r.getFileId());
+ }
+
+ private Path getMapFilePath(final int encodedName, final long fid) {
+ return getMapFilePath(encodedName, fid, HRegionInfo.NO_HASH);
+ }
+
+ private Path getMapFilePath(final int encodedName, final long fid,
+ final int ern) {
+ return new Path(HStoreFile.getMapDir(basedir, encodedName, colFamily),
+ createHStoreFilename(fid, ern));
+ }
+
+ /** @return path for info file */
+ Path getInfoFilePath() {
+ if (isReference()) {
+ return getInfoFilePath(encodedRegionName, fileId,
+ reference.getEncodedRegionName());
+
+ }
+ return getInfoFilePath(encodedRegionName, fileId);
+ }
+
+ private Path getInfoFilePath(final int encodedName, final long fid) {
+ return getInfoFilePath(encodedName, fid, HRegionInfo.NO_HASH);
+ }
+
+ private Path getInfoFilePath(final int encodedName, final long fid,
+ final int ern) {
+ return new Path(HStoreFile.getInfoDir(basedir, encodedName, colFamily),
+ createHStoreFilename(fid, ern));
+ }
+
+ // File handling
+
+ /*
+ * Split by making two new store files that reference top and bottom regions
+ * of original store file.
+ * @param midKey
+ * @param dstA
+ * @param dstB
+ * @param fs
+ * @param c
+ * @throws IOException
+ *
+ * @param midKey the key which will be the starting key of the second region
+ * @param dstA the file which will contain keys from the start of the source
+ * @param dstB the file which will contain keys from midKey to end of source
+ * @param fs file system
+ * @param c configuration
+ * @throws IOException
+ */
+ void splitStoreFile(final HStoreFile dstA, final HStoreFile dstB,
+ final FileSystem fs)
+ throws IOException {
+ dstA.writeReferenceFiles(fs);
+ dstB.writeReferenceFiles(fs);
+ }
+
+ void writeReferenceFiles(final FileSystem fs)
+ throws IOException {
+ createOrFail(fs, getMapFilePath());
+ writeSplitInfo(fs);
+ }
+
+ /*
+ * If reference, create and write the remote store file id, the midkey and
+ * whether we're going against the top file region of the referent out to
+ * the info file.
+ * @param p Path to info file.
+ * @param hsf
+ * @param fs
+ * @throws IOException
+ */
+ private void writeSplitInfo(final FileSystem fs) throws IOException {
+ Path p = getInfoFilePath();
+ if (fs.exists(p)) {
+ throw new IOException("File already exists " + p.toString());
+ }
+ FSDataOutputStream out = fs.create(p);
+ try {
+ reference.write(out);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * @see #writeSplitInfo(FileSystem fs)
+ */
+ static HStoreFile.Reference readSplitInfo(final Path p, final FileSystem fs)
+ throws IOException {
+ FSDataInputStream in = fs.open(p);
+ try {
+ HStoreFile.Reference r = new HStoreFile.Reference();
+ r.readFields(in);
+ return r;
+ } finally {
+ in.close();
+ }
+ }
+
+ private void createOrFail(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs.exists(p)) {
+ throw new IOException("File already exists " + p.toString());
+ }
+ if (!fs.createNewFile(p)) {
+ throw new IOException("Failed create of " + p);
+ }
+ }
+
+ /**
+ * Reads in an info file
+ *
+ * @param fs file system
+ * @return The sequence id contained in the info file
+ * @throws IOException
+ */
+ long loadInfo(FileSystem fs) throws IOException {
+ Path p = null;
+ if (isReference()) {
+ p = getInfoFilePath(reference.getEncodedRegionName(), reference.getFileId());
+ } else {
+ p = getInfoFilePath();
+ }
+ DataInputStream in = new DataInputStream(fs.open(p));
+ try {
+ byte flag = in.readByte();
+ if(flag == INFO_SEQ_NUM) {
+ return in.readLong();
+ }
+ throw new IOException("Cannot process log file: " + p);
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Writes the file-identifier to disk
+ *
+ * @param fs file system
+ * @param infonum file id
+ * @throws IOException
+ */
+ void writeInfo(FileSystem fs, long infonum) throws IOException {
+ Path p = getInfoFilePath();
+ FSDataOutputStream out = fs.create(p);
+ try {
+ out.writeByte(INFO_SEQ_NUM);
+ out.writeLong(infonum);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * Delete store map files.
+ * @throws IOException
+ */
+ public void delete() throws IOException {
+ fs.delete(getMapFilePath(), true);
+ fs.delete(getInfoFilePath(), true);
+ }
+
+ /**
+ * Renames the mapfiles and info directories under the passed
+ * <code>hsf</code> directory.
+ * @param fs
+ * @param hsf
+ * @return True if succeeded.
+ * @throws IOException
+ */
+ public boolean rename(final FileSystem fs, final HStoreFile hsf)
+ throws IOException {
+ Path src = getMapFilePath();
+ if (!fs.exists(src)) {
+ throw new FileNotFoundException(src.toString());
+ }
+ boolean success = fs.rename(src, hsf.getMapFilePath());
+ if (!success) {
+ LOG.warn("Failed rename of " + src + " to " + hsf.getMapFilePath());
+ } else {
+ src = getInfoFilePath();
+ if (!fs.exists(src)) {
+ throw new FileNotFoundException(src.toString());
+ }
+ success = fs.rename(src, hsf.getInfoFilePath());
+ if (!success) {
+ LOG.warn("Failed rename of " + src + " to " + hsf.getInfoFilePath());
+ }
+ }
+ return success;
+ }
+
+ /**
+ * Get reader for the store file map file.
+ * Client is responsible for closing file when done.
+ * @param fs
+ * @param bloomFilter If true, a bloom filter exists
+ * @param blockCacheEnabled If true, MapFile blocks should be cached.
+ * @return BloomFilterMapFile.Reader
+ * @throws IOException
+ */
+ public synchronized BloomFilterMapFile.Reader getReader(final FileSystem fs,
+ final boolean bloomFilter, final boolean blockCacheEnabled)
+ throws IOException {
+ if (isReference()) {
+ return new HStoreFile.HalfMapFileReader(fs,
+ getMapFilePath(reference).toString(), conf,
+ reference.getFileRegion(), reference.getMidkey(), bloomFilter,
+ blockCacheEnabled);
+ }
+ return new BloomFilterMapFile.Reader(fs, getMapFilePath().toString(),
+ conf, bloomFilter, blockCacheEnabled);
+ }
+
+ /**
+ * Get a store file writer.
+ * Client is responsible for closing file when done.
+ * @param fs
+ * @param compression Pass <code>SequenceFile.CompressionType.NONE</code>
+ * for none.
+ * @param bloomFilter If true, create a bloom filter
+ * @param nrows number of rows expected. Required if bloomFilter is true.
+ * @return MapFile.Writer
+ * @throws IOException
+ */
+ public MapFile.Writer getWriter(final FileSystem fs,
+ final SequenceFile.CompressionType compression,
+ final boolean bloomFilter, int nrows)
+ throws IOException {
+ if (isReference()) {
+ throw new IOException("Illegal Access: Cannot get a writer on a" +
+ "HStoreFile reference");
+ }
+ return new BloomFilterMapFile.Writer(conf, fs,
+ getMapFilePath().toString(), compression, bloomFilter, nrows);
+ }
+
+ /**
+ * @return Length of the store map file. If a reference, size is
+ * approximation.
+ * @throws IOException
+ */
+ public long length() throws IOException {
+ Path p = new Path(getMapFilePath(reference), MapFile.DATA_FILE_NAME);
+ long l = p.getFileSystem(conf).getFileStatus(p).getLen();
+ return (isReference())? l / 2: l;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return encodedRegionName + "/" + Bytes.toString(colFamily) + "/" + fileId +
+ (isReference()? "-" + reference.toString(): "");
+ }
+
+ static boolean isTopFileRegion(final Range r) {
+ return r.equals(Range.top);
+ }
+
+ private static String createHStoreFilename(final long fid,
+ final int encodedRegionName) {
+ return Long.toString(fid) +
+ ((encodedRegionName != HRegionInfo.NO_HASH)?
+ "." + encodedRegionName : "");
+ }
+
+ /**
+ * @param dir Base directory
+ * @param encodedRegionName Encoding of region name.
+ * @param f Column family.
+ * @return path for map file directory
+ */
+ public static Path getMapDir(Path dir, int encodedRegionName,
+ final byte [] f) {
+ return getFamilySubDir(dir, encodedRegionName, f, HSTORE_DATFILE_DIR);
+ }
+
+ /**
+ * @param dir Base directory
+ * @param encodedRegionName Encoding of region name.
+ * @param f Column family.
+ * @return the info directory path
+ */
+ public static Path getInfoDir(Path dir, int encodedRegionName, byte [] f) {
+ return getFamilySubDir(dir, encodedRegionName, f, HSTORE_INFO_DIR);
+ }
+
+ /**
+ * @param dir Base directory
+ * @param encodedRegionName Encoding of region name.
+ * @param f Column family.
+ * @return the bloom filter directory path
+ */
+ @Deprecated
+ public static Path getFilterDir(Path dir, int encodedRegionName,
+ final byte [] f) {
+ return getFamilySubDir(dir, encodedRegionName, f, HSTORE_FILTER_DIR);
+ }
+
+ /*
+ * @param base Base directory
+ * @param encodedRegionName Encoding of region name.
+ * @param f Column family.
+ * @param subdir Subdirectory to create under column family/store directory.
+ * @return
+ */
+ private static Path getFamilySubDir(final Path base,
+ final int encodedRegionName, final byte [] f, final String subdir) {
+ return new Path(base, new Path(Integer.toString(encodedRegionName),
+ new Path(Bytes.toString(f), subdir)));
+ }
+
+ /*
+ * Data structure to hold reference to a store file over in another region.
+ */
+ static class Reference implements Writable {
+ private int encodedRegionName;
+ private long fileid;
+ private Range region;
+ private HStoreKey midkey;
+
+ Reference(final int ern, final long fid, final HStoreKey m,
+ final Range fr) {
+ this.encodedRegionName = ern;
+ this.fileid = fid;
+ this.region = fr;
+ this.midkey = m;
+ }
+
+ Reference() {
+ this(-1, -1, null, Range.bottom);
+ }
+
+ long getFileId() {
+ return fileid;
+ }
+
+ Range getFileRegion() {
+ return region;
+ }
+
+ HStoreKey getMidkey() {
+ return midkey;
+ }
+
+ int getEncodedRegionName() {
+ return this.encodedRegionName;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return encodedRegionName + "/" + fileid + "/" + region;
+ }
+
+ // Make it serializable.
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ // Write out the encoded region name as a String. Doing it as a String
+ // keeps a Reference's serialziation backword compatible with
+ // pre-HBASE-82 serializations. ALternative is rewriting all
+ // info files in hbase (Serialized References are written into the
+ // 'info' file that accompanies HBase Store files).
+ out.writeUTF(Integer.toString(encodedRegionName));
+ out.writeLong(fileid);
+ // Write true if we're doing top of the file.
+ out.writeBoolean(isTopFileRegion(region));
+ midkey.write(out);
+ }
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput in) throws IOException {
+ this.encodedRegionName = Integer.parseInt(in.readUTF());
+ fileid = in.readLong();
+ boolean tmp = in.readBoolean();
+ // If true, set region to top.
+ region = tmp? Range.top: Range.bottom;
+ midkey = new HStoreKey();
+ midkey.readFields(in);
+ }
+ }
+
+ /**
+ * Hbase customizations of MapFile.
+ */
+ static class HbaseMapFile extends MapFile {
+ static final Class<? extends Writable> KEY_CLASS = HStoreKey.class;
+ static final Class<? extends Writable> VALUE_CLASS =
+ ImmutableBytesWritable.class;
+
+ /**
+ * Custom bloom filter key maker.
+ * @param key
+ * @return Key made of bytes of row only.
+ */
+ protected static Key getBloomFilterKey(WritableComparable key) {
+ return new Key(((HStoreKey) key).getRow());
+ }
+
+ /**
+ * A reader capable of reading and caching blocks of the data file.
+ */
+ static class HbaseReader extends MapFile.Reader {
+
+ private final boolean blockCacheEnabled;
+
+ /**
+ * @param fs
+ * @param dirName
+ * @param conf
+ * @throws IOException
+ */
+ public HbaseReader(FileSystem fs, String dirName, Configuration conf)
+ throws IOException {
+ this(fs, dirName, conf, false);
+ }
+
+ /**
+ * @param fs
+ * @param dirName
+ * @param conf
+ * @param blockCacheEnabled
+ * @throws IOException
+ */
+ public HbaseReader(FileSystem fs, String dirName, Configuration conf,
+ boolean blockCacheEnabled)
+ throws IOException {
+ super(fs, dirName, null, conf, false); // defer opening streams
+ this.blockCacheEnabled = blockCacheEnabled;
+ open(fs, dirName, null, conf);
+
+ // Force reading of the mapfile index by calling midKey.
+ // Reading the index will bring the index into memory over
+ // here on the client and then close the index file freeing
+ // up socket connection and resources in the datanode.
+ // Usually, the first access on a MapFile.Reader will load the
+ // index force the issue in HStoreFile MapFiles because an
+ // access may not happen for some time; meantime we're
+ // using up datanode resources. See HADOOP-2341.
+ midKey();
+ }
+
+ @Override
+ protected org.apache.hadoop.io.SequenceFile.Reader createDataFileReader(
+ FileSystem fs, Path dataFile, Configuration conf)
+ throws IOException {
+ if (!blockCacheEnabled) {
+ return super.createDataFileReader(fs, dataFile, conf);
+ }
+ LOG.info("Block Cache enabled");
+ final int blockSize = conf.getInt("hbase.hstore.blockCache.blockSize",
+ 64 * 1024);
+ return new SequenceFile.Reader(fs, dataFile, conf) {
+ @Override
+ protected FSDataInputStream openFile(FileSystem fs, Path file,
+ int bufferSize, long length) throws IOException {
+
+ return new FSDataInputStream(new BlockFSInputStream(
+ super.openFile(fs, file, bufferSize, length), length,
+ blockSize));
+ }
+ };
+ }
+ }
+
+ static class HbaseWriter extends MapFile.Writer {
+ /**
+ * @param conf
+ * @param fs
+ * @param dirName
+ * @param compression
+ * @throws IOException
+ */
+ public HbaseWriter(Configuration conf, FileSystem fs, String dirName,
+ SequenceFile.CompressionType compression)
+ throws IOException {
+ super(conf, fs, dirName, KEY_CLASS, VALUE_CLASS, compression);
+ // Default for mapfiles is 128. Makes random reads faster if we
+ // have more keys indexed and we're not 'next'-ing around in the
+ // mapfile.
+ setIndexInterval(conf.getInt("hbase.io.index.interval", 128));
+ }
+ }
+ }
+
+ /**
+ * On write, all keys are added to a bloom filter. On read, all keys are
+ * tested first against bloom filter. Keys are HStoreKey. If passed bloom
+ * filter is null, just passes invocation to parent.
+ */
+ static class BloomFilterMapFile extends HbaseMapFile {
+ protected static final String BLOOMFILTER_FILE_NAME = "filter";
+
+ static class Reader extends HbaseReader {
+ private final BloomFilter bloomFilter;
+
+ /**
+ * @param fs
+ * @param dirName
+ * @param conf
+ * @param filter
+ * @param blockCacheEnabled
+ * @throws IOException
+ */
+ public Reader(FileSystem fs, String dirName, Configuration conf,
+ final boolean filter, final boolean blockCacheEnabled)
+ throws IOException {
+ super(fs, dirName, conf, blockCacheEnabled);
+ if (filter) {
+ this.bloomFilter = loadBloomFilter(fs, dirName);
+ } else {
+ this.bloomFilter = null;
+ }
+ }
+
+ private BloomFilter loadBloomFilter(FileSystem fs, String dirName)
+ throws IOException {
+ Path filterFile = new Path(dirName, BLOOMFILTER_FILE_NAME);
+ if(!fs.exists(filterFile)) {
+ throw new FileNotFoundException("Could not find bloom filter: " +
+ filterFile);
+ }
+ BloomFilter filter = new BloomFilter();
+ FSDataInputStream in = fs.open(filterFile);
+ try {
+ bloomFilter.readFields(in);
+ } finally {
+ fs.close();
+ }
+ return filter;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Writable get(WritableComparable key, Writable val)
+ throws IOException {
+ if (bloomFilter == null) {
+ return super.get(key, val);
+ }
+ if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("bloom filter reported that key exists");
+ }
+ return super.get(key, val);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("bloom filter reported that key does not exist");
+ }
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public WritableComparable getClosest(WritableComparable key,
+ Writable val) throws IOException {
+ if (bloomFilter == null) {
+ return super.getClosest(key, val);
+ }
+ // Note - the key being passed to us is always a HStoreKey
+ if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("bloom filter reported that key exists");
+ }
+ return super.getClosest(key, val);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("bloom filter reported that key does not exist");
+ }
+ return null;
+ }
+
+ /* @return size of the bloom filter */
+ int getBloomFilterSize() {
+ return bloomFilter == null ? 0 : bloomFilter.getVectorSize();
+ }
+ }
+
+ static class Writer extends HbaseWriter {
+ private static final double DEFAULT_NUMBER_OF_HASH_FUNCTIONS = 4.0;
+ private final BloomFilter bloomFilter;
+ private final String dirName;
+ private final FileSystem fs;
+
+ /**
+ * @param conf
+ * @param fs
+ * @param dirName
+ * @param compression
+ * @param filter
+ * @param nrows
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public Writer(Configuration conf, FileSystem fs, String dirName,
+ SequenceFile.CompressionType compression, final boolean filter,
+ int nrows)
+ throws IOException {
+ super(conf, fs, dirName, compression);
+ this.dirName = dirName;
+ this.fs = fs;
+ if (filter) {
+ /*
+ * There is no way to automatically determine the vector size and the
+ * number of hash functions to use. In particular, bloom filters are
+ * very sensitive to the number of elements inserted into them. For
+ * HBase, the number of entries depends on the size of the data stored
+ * in the column. Currently the default region size is 256MB, so the
+ * number of entries is approximately
+ * 256MB / (average value size for column).
+ *
+ * If m denotes the number of bits in the Bloom filter (vectorSize),
+ * n denotes the number of elements inserted into the Bloom filter and
+ * k represents the number of hash functions used (nbHash), then
+ * according to Broder and Mitzenmacher,
+ *
+ * ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf )
+ *
+ * the probability of false positives is minimized when k is
+ * approximately m/n ln(2).
+ */
+ this.bloomFilter = new BloomFilter(
+ (int) DEFAULT_NUMBER_OF_HASH_FUNCTIONS,
+ (int) Math.ceil(
+ (DEFAULT_NUMBER_OF_HASH_FUNCTIONS * (1.0 * nrows)) /
+ Math.log(2.0))
+ );
+ } else {
+ this.bloomFilter = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void append(WritableComparable key, Writable val)
+ throws IOException {
+ if (bloomFilter != null) {
+ bloomFilter.add(getBloomFilterKey(key));
+ }
+ super.append(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public synchronized void close() throws IOException {
+ super.close();
+ if (this.bloomFilter != null) {
+ flushBloomFilter();
+ }
+ }
+
+ /**
+ * Flushes bloom filter to disk
+ *
+ * @throws IOException
+ */
+ private void flushBloomFilter() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("flushing bloom filter for " + this.dirName);
+ }
+ FSDataOutputStream out =
+ fs.create(new Path(dirName, BLOOMFILTER_FILE_NAME));
+ try {
+ bloomFilter.write(out);
+ } finally {
+ out.close();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("flushed bloom filter for " + this.dirName);
+ }
+ }
+ }
+ }
+
+ /**
+ * A facade for a {@link MapFile.Reader} that serves up either the top or
+ * bottom half of a MapFile (where 'bottom' is the first half of the file
+ * containing the keys that sort lowest and 'top' is the second half of the
+ * file with keys that sort greater than those of the bottom half).
+ * Subclasses BloomFilterMapFile.Reader in case
+ *
+ * <p>This file is not splitable. Calls to {@link #midKey()} return null.
+ */
+ static class HalfMapFileReader extends BloomFilterMapFile.Reader {
+ private final boolean top;
+ private final WritableComparable midkey;
+ private boolean firstNextCall = true;
+
+ HalfMapFileReader(final FileSystem fs, final String dirName,
+ final Configuration conf, final Range r,
+ final WritableComparable midKey)
+ throws IOException {
+ this(fs, dirName, conf, r, midKey, false, false);
+ }
+
+ HalfMapFileReader(final FileSystem fs, final String dirName,
+ final Configuration conf, final Range r,
+ final WritableComparable midKey, final boolean filter,
+ final boolean blockCacheEnabled)
+ throws IOException {
+ super(fs, dirName, conf, filter, blockCacheEnabled);
+ top = isTopFileRegion(r);
+ midkey = midKey;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void checkKey(final WritableComparable key)
+ throws IOException {
+ if (top) {
+ if (key.compareTo(midkey) < 0) {
+ throw new IOException("Illegal Access: Key is less than midKey of " +
+ "backing mapfile");
+ }
+ } else if (key.compareTo(midkey) >= 0) {
+ throw new IOException("Illegal Access: Key is greater than or equal " +
+ "to midKey of backing mapfile");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public synchronized void finalKey(WritableComparable key)
+ throws IOException {
+ if (top) {
+ super.finalKey(key);
+ } else {
+ reset();
+ Writable value = new ImmutableBytesWritable();
+ WritableComparable k = super.getClosest(midkey, value, true);
+ ByteArrayOutputStream byteout = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(byteout);
+ k.write(out);
+ ByteArrayInputStream bytein =
+ new ByteArrayInputStream(byteout.toByteArray());
+ DataInputStream in = new DataInputStream(bytein);
+ key.readFields(in);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public synchronized Writable get(WritableComparable key, Writable val)
+ throws IOException {
+ checkKey(key);
+ return super.get(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized WritableComparable getClosest(WritableComparable key,
+ Writable val)
+ throws IOException {
+ WritableComparable closest = null;
+ if (top) {
+ // If top, the lowest possible key is midkey. Do not have to check
+ // what comes back from super getClosest. Will return exact match or
+ // greater.
+ closest = (key.compareTo(this.midkey) < 0)?
+ this.midkey: super.getClosest(key, val);
+ } else {
+ // We're serving bottom of the file.
+ if (key.compareTo(this.midkey) < 0) {
+ // Check key is within range for bottom.
+ closest = super.getClosest(key, val);
+ // midkey was made against largest store file at time of split. Smaller
+ // store files could have anything in them. Check return value is
+ // not beyond the midkey (getClosest returns exact match or next
+ // after).
+ if (closest != null && closest.compareTo(this.midkey) >= 0) {
+ // Don't let this value out.
+ closest = null;
+ }
+ }
+ // Else, key is > midkey so let out closest = null.
+ }
+ return closest;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unused")
+ @Override
+ public synchronized WritableComparable midKey() throws IOException {
+ // Returns null to indicate file is not splitable.
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized boolean next(WritableComparable key, Writable val)
+ throws IOException {
+ if (firstNextCall) {
+ firstNextCall = false;
+ if (this.top) {
+ // Seek to midkey. Midkey may not exist in this file. That should be
+ // fine. Then we'll either be positioned at end or start of file.
+ WritableComparable nearest = getClosest(midkey, val);
+ // Now copy the mid key into the passed key.
+ if (nearest != null) {
+ Writables.copyWritable(nearest, key);
+ return true;
+ }
+ return false;
+ }
+ }
+ boolean result = super.next(key, val);
+ if (!top && key.compareTo(midkey) >= 0) {
+ result = false;
+ }
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public synchronized void reset() throws IOException {
+ if (top) {
+ firstNextCall = true;
+ seek(midkey);
+ return;
+ }
+ super.reset();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public synchronized boolean seek(WritableComparable key)
+ throws IOException {
+ checkKey(key);
+ return super.seek(key);
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreScanner.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreScanner.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreScanner.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,269 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.migration.v5;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Scanner scans both the memcache and the HStore
+ */
+class HStoreScanner implements InternalScanner {
+ static final Log LOG = LogFactory.getLog(HStoreScanner.class);
+
+ private InternalScanner[] scanners;
+ private TreeMap<byte [], Cell>[] resultSets;
+ private HStoreKey[] keys;
+ private boolean wildcardMatch = false;
+ private boolean multipleMatchers = false;
+ private RowFilterInterface dataFilter;
+ private HStore store;
+
+ /** Create an Scanner with a handle on the memcache and HStore files. */
+ @SuppressWarnings("unchecked")
+ HStoreScanner(HStore store, byte [][] targetCols, byte [] firstRow,
+ long timestamp, RowFilterInterface filter)
+ throws IOException {
+ this.store = store;
+ this.dataFilter = filter;
+ if (null != dataFilter) {
+ dataFilter.reset();
+ }
+ this.scanners = new InternalScanner[2];
+ this.resultSets = new TreeMap[scanners.length];
+ this.keys = new HStoreKey[scanners.length];
+
+ try {
+ scanners[0] = store.memcache.getScanner(timestamp, targetCols, firstRow);
+ scanners[1] = new StoreFileScanner(store, timestamp, targetCols, firstRow);
+ for (int i = 0; i < scanners.length; i++) {
+ if (scanners[i].isWildcardScanner()) {
+ this.wildcardMatch = true;
+ }
+ if (scanners[i].isMultipleMatchScanner()) {
+ this.multipleMatchers = true;
+ }
+ }
+ } catch(IOException e) {
+ for (int i = 0; i < this.scanners.length; i++) {
+ if(scanners[i] != null) {
+ closeScanner(i);
+ }
+ }
+ throw e;
+ }
+
+ // Advance to the first key in each scanner.
+ // All results will match the required column-set and scanTime.
+ for (int i = 0; i < scanners.length; i++) {
+ keys[i] = new HStoreKey();
+ resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
+ if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
+ closeScanner(i);
+ }
+ }
+ }
+
+ /** @return true if the scanner is a wild card scanner */
+ public boolean isWildcardScanner() {
+ return wildcardMatch;
+ }
+
+ /** @return true if the scanner is a multiple match scanner */
+ public boolean isMultipleMatchScanner() {
+ return multipleMatchers;
+ }
+
+ /** {@inheritDoc} */
+ public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+ throws IOException {
+
+ // Filtered flag is set by filters. If a cell has been 'filtered out'
+ // -- i.e. it is not to be returned to the caller -- the flag is 'true'.
+ boolean filtered = true;
+ boolean moreToFollow = true;
+ while (filtered && moreToFollow) {
+ // Find the lowest-possible key.
+ byte [] chosenRow = null;
+ long chosenTimestamp = -1;
+ for (int i = 0; i < this.keys.length; i++) {
+ if (scanners[i] != null &&
+ (chosenRow == null ||
+ (Bytes.compareTo(keys[i].getRow(), chosenRow) < 0) ||
+ ((Bytes.compareTo(keys[i].getRow(), chosenRow) == 0) &&
+ (keys[i].getTimestamp() > chosenTimestamp)))) {
+ chosenRow = keys[i].getRow();
+ chosenTimestamp = keys[i].getTimestamp();
+ }
+ }
+
+ // Filter whole row by row key?
+ filtered = dataFilter != null? dataFilter.filterRowKey(chosenRow) : false;
+
+ // Store the key and results for each sub-scanner. Merge them as
+ // appropriate.
+ if (chosenTimestamp >= 0 && !filtered) {
+ // Here we are setting the passed in key with current row+timestamp
+ key.setRow(chosenRow);
+ key.setVersion(chosenTimestamp);
+ key.setColumn(HConstants.EMPTY_BYTE_ARRAY);
+ // Keep list of deleted cell keys within this row. We need this
+ // because as we go through scanners, the delete record may be in an
+ // early scanner and then the same record with a non-delete, non-null
+ // value in a later. Without history of what we've seen, we'll return
+ // deleted values. This List should not ever grow too large since we
+ // are only keeping rows and columns that match those set on the
+ // scanner and which have delete values. If memory usage becomes a
+ // problem, could redo as bloom filter.
+ List<HStoreKey> deletes = new ArrayList<HStoreKey>();
+ for (int i = 0; i < scanners.length && !filtered; i++) {
+ while ((scanners[i] != null
+ && !filtered
+ && moreToFollow)
+ && (Bytes.compareTo(keys[i].getRow(), chosenRow) == 0)) {
+ // If we are doing a wild card match or there are multiple
+ // matchers per column, we need to scan all the older versions of
+ // this row to pick up the rest of the family members
+ if (!wildcardMatch
+ && !multipleMatchers
+ && (keys[i].getTimestamp() != chosenTimestamp)) {
+ break;
+ }
+
+ // NOTE: We used to do results.putAll(resultSets[i]);
+ // but this had the effect of overwriting newer
+ // values with older ones. So now we only insert
+ // a result if the map does not contain the key.
+ HStoreKey hsk = new HStoreKey(key.getRow(), HConstants.EMPTY_BYTE_ARRAY,
+ key.getTimestamp());
+ for (Map.Entry<byte [], Cell> e : resultSets[i].entrySet()) {
+ hsk.setColumn(e.getKey());
+ if (HLogEdit.isDeleted(e.getValue().getValue())) {
+ if (!deletes.contains(hsk)) {
+ // Key changes as we cycle the for loop so add a copy to
+ // the set of deletes.
+ deletes.add(new HStoreKey(hsk));
+ }
+ } else if (!deletes.contains(hsk) &&
+ !filtered &&
+ moreToFollow &&
+ !results.containsKey(e.getKey())) {
+ if (dataFilter != null) {
+ // Filter whole row by column data?
+ filtered = dataFilter.filterColumn(chosenRow, e.getKey(),
+ e.getValue().getValue());
+ if (filtered) {
+ results.clear();
+ break;
+ }
+ }
+ results.put(e.getKey(), e.getValue());
+ }
+ }
+ resultSets[i].clear();
+ if (!scanners[i].next(keys[i], resultSets[i])) {
+ closeScanner(i);
+ }
+ }
+ }
+ }
+
+ for (int i = 0; i < scanners.length; i++) {
+ // If the current scanner is non-null AND has a lower-or-equal
+ // row label, then its timestamp is bad. We need to advance it.
+ while ((scanners[i] != null) &&
+ (Bytes.compareTo(keys[i].getRow(), chosenRow) <= 0)) {
+ resultSets[i].clear();
+ if (!scanners[i].next(keys[i], resultSets[i])) {
+ closeScanner(i);
+ }
+ }
+ }
+
+ moreToFollow = chosenTimestamp >= 0;
+
+ if (dataFilter != null) {
+ if (dataFilter.filterAllRemaining()) {
+ moreToFollow = false;
+ }
+ }
+
+ if (results.size() <= 0 && !filtered) {
+ // There were no results found for this row. Marked it as
+ // 'filtered'-out otherwise we will not move on to the next row.
+ filtered = true;
+ }
+ }
+
+ // If we got no results, then there is no more to follow.
+ if (results == null || results.size() <= 0) {
+ moreToFollow = false;
+ }
+
+ // Make sure scanners closed if no more results
+ if (!moreToFollow) {
+ for (int i = 0; i < scanners.length; i++) {
+ if (null != scanners[i]) {
+ closeScanner(i);
+ }
+ }
+ }
+
+ return moreToFollow;
+ }
+
+ /** Shut down a single scanner */
+ void closeScanner(int i) {
+ try {
+ try {
+ scanners[i].close();
+ } catch (IOException e) {
+ LOG.warn(store.storeName + " failed closing scanner " + i, e);
+ }
+ } finally {
+ scanners[i] = null;
+ keys[i] = null;
+ resultSets[i] = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void close() {
+ for(int i = 0; i < scanners.length; i++) {
+ if(scanners[i] != null) {
+ closeScanner(i);
+ }
+ }
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HTableDescriptor.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HTableDescriptor.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HTableDescriptor.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,328 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.migration.v5;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * HTableDescriptor contains the name of an HTable, and its
+ * column families.
+ */
+public class HTableDescriptor implements WritableComparable {
+ /** Table descriptor for <core>-ROOT-</code> catalog table */
+ public static final HTableDescriptor ROOT_TABLEDESC = new HTableDescriptor(
+ HConstants.ROOT_TABLE_NAME,
+ new HColumnDescriptor[] { new HColumnDescriptor(HConstants.COLUMN_FAMILY,
+ 1, HColumnDescriptor.CompressionType.NONE, false, false,
+ Integer.MAX_VALUE, HConstants.FOREVER, false) });
+
+ /** Table descriptor for <code>.META.</code> catalog table */
+ public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor(
+ HConstants.META_TABLE_NAME, new HColumnDescriptor[] {
+ new HColumnDescriptor(HConstants.COLUMN_FAMILY, 1,
+ HColumnDescriptor.CompressionType.NONE, false, false,
+ Integer.MAX_VALUE, HConstants.FOREVER, false),
+ new HColumnDescriptor(HConstants.COLUMN_FAMILY_HISTORIAN,
+ HConstants.ALL_VERSIONS, HColumnDescriptor.CompressionType.NONE,
+ false, false, Integer.MAX_VALUE, HConstants.FOREVER, false) });
+
+ private boolean rootregion = false;
+ private boolean metaregion = false;
+ private byte [] name = HConstants.EMPTY_BYTE_ARRAY;
+ private String nameAsString = "";
+
+ public static final String FAMILIES = "FAMILIES";
+
+ // Key is hash of the family name.
+ private final Map<Integer, HColumnDescriptor> families =
+ new HashMap<Integer, HColumnDescriptor>();
+
+ /**
+ * Private constructor used internally creating table descriptors for
+ * catalog tables: e.g. .META. and -ROOT-.
+ */
+ private HTableDescriptor(final byte [] name, HColumnDescriptor[] families) {
+ this.name = name.clone();
+ setMetaFlags(name);
+ for(HColumnDescriptor descriptor : families) {
+ this.families.put(Bytes.mapKey(descriptor.getName()), descriptor);
+ }
+ }
+
+ /**
+ * Constructs an empty object.
+ * For deserializing an HTableDescriptor instance only.
+ * @see #HTableDescriptor(byte[])
+ */
+ public HTableDescriptor() {
+ super();
+ }
+
+ /**
+ * Constructor.
+ * @param name Table name.
+ * @throws IllegalArgumentException if passed a table name
+ * that is made of other than 'word' characters, underscore or period: i.e.
+ * <code>[a-zA-Z_0-9.].
+ * @see <a href="HADOOP-1581">HADOOP-1581 HBASE: Un-openable tablename bug</a>
+ */
+ public HTableDescriptor(final String name) {
+ this(Bytes.toBytes(name));
+ }
+
+ /**
+ * Constructor.
+ * @param name Table name.
+ * @throws IllegalArgumentException if passed a table name
+ * that is made of other than 'word' characters, underscore or period: i.e.
+ * <code>[a-zA-Z_0-9.].
+ * @see <a href="HADOOP-1581">HADOOP-1581 HBASE: Un-openable tablename bug</a>
+ */
+ public HTableDescriptor(final byte [] name) {
+ setMetaFlags(name);
+ this.name = this.metaregion? name: isLegalTableName(name);
+ this.nameAsString = Bytes.toString(this.name);
+ }
+
+ /*
+ * Set meta flags on this table.
+ * Called by constructors.
+ * @param name
+ */
+ private void setMetaFlags(final byte [] name) {
+ this.rootregion = Bytes.equals(name, HConstants.ROOT_TABLE_NAME);
+ this.metaregion =
+ this.rootregion? true: Bytes.equals(name, HConstants.META_TABLE_NAME);
+ }
+
+ /**
+ * Check passed buffer is legal user-space table name.
+ * @param b Table name.
+ * @return Returns passed <code>b</code> param
+ * @throws NullPointerException If passed <code>b</code> is null
+ * @throws IllegalArgumentException if passed a table name
+ * that is made of other than 'word' characters or underscores: i.e.
+ * <code>[a-zA-Z_0-9].
+ */
+ public static byte [] isLegalTableName(final byte [] b) {
+ if (b == null || b.length <= 0) {
+ throw new IllegalArgumentException("Name is null or empty");
+ }
+ for (int i = 0; i < b.length; i++) {
+ if (Character.isLetterOrDigit(b[i]) || b[i] == '_') {
+ continue;
+ }
+ throw new IllegalArgumentException("Illegal character <" + b[i] + ">. " +
+ "User-space table names can only contain 'word characters':" +
+ "i.e. [a-zA-Z_0-9]: " + Bytes.toString(b));
+ }
+ return b;
+ }
+
+ /** @return true if this is the root region */
+ public boolean isRootRegion() {
+ return rootregion;
+ }
+
+ /** @return true if table is the meta table */
+ public boolean isMetaTable() {
+ return metaregion && !rootregion;
+ }
+
+ /** @return true if this is a meta region (part of the root or meta tables) */
+ public boolean isMetaRegion() {
+ return metaregion;
+ }
+
+ /** @return name of table */
+ public byte [] getName() {
+ return name;
+ }
+
+ /** @return name of table */
+ public String getNameAsString() {
+ return this.nameAsString;
+ }
+
+ /**
+ * Adds a column family.
+ * @param family HColumnDescriptor of familyto add.
+ */
+ public void addFamily(final HColumnDescriptor family) {
+ if (family.getName() == null || family.getName().length <= 0) {
+ throw new NullPointerException("Family name cannot be null or empty");
+ }
+ this.families.put(Bytes.mapKey(family.getName()), family);
+ }
+
+ /**
+ * Checks to see if this table contains the given column family
+ * @param c Family name or column name.
+ * @return true if the table contains the specified family name
+ */
+ public boolean hasFamily(final byte [] c) {
+ return hasFamily(c, HStoreKey.getFamilyDelimiterIndex(c));
+ }
+
+ /**
+ * Checks to see if this table contains the given column family
+ * @param c Family name or column name.
+ * @param index Index to column family delimiter
+ * @return true if the table contains the specified family name
+ */
+ public boolean hasFamily(final byte [] c, final int index) {
+ // If index is -1, then presume we were passed a column family name minus
+ // the colon delimiter.
+ return families.containsKey(Bytes.mapKey(c, index == -1? c.length: index));
+ }
+
+ /**
+ * @return Name of this table and then a map of all of the column family
+ * descriptors.
+ * @see #getNameAsString()
+ */
+ @Override
+ public String toString() {
+ return HConstants.NAME + " => '" + Bytes.toString(this.name) +
+ "', " + FAMILIES + " => " + this.families.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object obj) {
+ return compareTo(obj) == 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode() {
+ // TODO: Cache.
+ int result = Bytes.hashCode(this.name);
+ if (this.families != null && this.families.size() > 0) {
+ for (HColumnDescriptor e: this.families.values()) {
+ result ^= e.hashCode();
+ }
+ }
+ return result;
+ }
+
+ // Writable
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(rootregion);
+ out.writeBoolean(metaregion);
+ Bytes.writeByteArray(out, name);
+ out.writeInt(families.size());
+ for(Iterator<HColumnDescriptor> it = families.values().iterator();
+ it.hasNext(); ) {
+ it.next().write(out);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput in) throws IOException {
+ this.rootregion = in.readBoolean();
+ this.metaregion = in.readBoolean();
+ this.name = Bytes.readByteArray(in);
+ this.nameAsString = Bytes.toString(this.name);
+ int numCols = in.readInt();
+ this.families.clear();
+ for (int i = 0; i < numCols; i++) {
+ HColumnDescriptor c = new HColumnDescriptor();
+ c.readFields(in);
+ this.families.put(Bytes.mapKey(c.getName()), c);
+ }
+ }
+
+ // Comparable
+
+ /** {@inheritDoc} */
+ public int compareTo(Object o) {
+ HTableDescriptor other = (HTableDescriptor) o;
+ int result = Bytes.compareTo(this.name, other.name);
+ if (result == 0) {
+ result = families.size() - other.families.size();
+ }
+
+ if (result == 0 && families.size() != other.families.size()) {
+ result = Integer.valueOf(families.size()).compareTo(
+ Integer.valueOf(other.families.size()));
+ }
+
+ if (result == 0) {
+ for (Iterator<HColumnDescriptor> it = families.values().iterator(),
+ it2 = other.families.values().iterator(); it.hasNext(); ) {
+ result = it.next().compareTo(it2.next());
+ if (result != 0) {
+ break;
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @return Immutable sorted map of families.
+ */
+ public Collection<HColumnDescriptor> getFamilies() {
+ return Collections.unmodifiableCollection(this.families.values());
+ }
+
+ /**
+ * @param column
+ * @return Column descriptor for the passed family name or the family on
+ * passed in column.
+ */
+ public HColumnDescriptor getFamily(final byte [] column) {
+ return this.families.get(HStoreKey.getFamilyMapKey(column));
+ }
+
+ /**
+ * @param column
+ * @return Column descriptor for the passed family name or the family on
+ * passed in column.
+ */
+ public HColumnDescriptor removeFamily(final byte [] column) {
+ return this.families.remove(HStoreKey.getFamilyMapKey(column));
+ }
+
+ /**
+ * @param rootdir qualified path of HBase root directory
+ * @param tableName name of table
+ * @return path for table
+ */
+ public static Path getTableDir(Path rootdir, final byte [] tableName) {
+ return new Path(rootdir, Bytes.toString(tableName));
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/LogRollListener.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/LogRollListener.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/LogRollListener.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/LogRollListener.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,29 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.migration.v5;
+
+/**
+ * Mechanism by which the HLog requests a log roll
+ */
+public interface LogRollListener {
+ /** Request that the log be rolled */
+ public void logRollRequested();
+}
\ No newline at end of file
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/Memcache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/Memcache.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/Memcache.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/Memcache.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,760 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.migration.v5;
+
+import java.io.IOException;
+import java.rmi.UnexpectedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.regionserver.HAbstractScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * The Memcache holds in-memory modifications to the HRegion.
+ * Keeps a current map. When asked to flush the map, current map is moved
+ * to snapshot and is cleared. We continue to serve edits out of new map
+ * and backing snapshot until flusher reports in that the flush succeeded. At
+ * this point we let the snapshot go.
+ */
+class Memcache {
+ private final Log LOG = LogFactory.getLog(this.getClass().getName());
+
+ private long ttl;
+
+ // Note that since these structures are always accessed with a lock held,
+ // so no additional synchronization is required.
+
+ // The currently active sorted map of edits.
+ private volatile SortedMap<HStoreKey, byte[]> memcache =
+ createSynchronizedSortedMap();
+
+ // Snapshot of memcache. Made for flusher.
+ private volatile SortedMap<HStoreKey, byte[]> snapshot =
+ createSynchronizedSortedMap();
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /**
+ * Default constructor. Used for tests.
+ */
+ public Memcache()
+ {
+ ttl = HConstants.FOREVER;
+ }
+
+ /**
+ * Constructor.
+ * @param ttl The TTL for cache entries, in milliseconds.
+ */
+ public Memcache(long ttl) {
+ this.ttl = ttl;
+ }
+
+ /*
+ * Utility method.
+ * @return sycnhronized sorted map of HStoreKey to byte arrays.
+ */
+ private static SortedMap<HStoreKey, byte[]> createSynchronizedSortedMap() {
+ return Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
+ }
+
+ /**
+ * Creates a snapshot of the current Memcache.
+ * Snapshot must be cleared by call to {@link #clearSnapshot(SortedMap)}
+ * To get the snapshot made by this method, use
+ * {@link #getSnapshot}.
+ */
+ void snapshot() {
+ this.lock.writeLock().lock();
+ try {
+ // If snapshot currently has entries, then flusher failed or didn't call
+ // cleanup. Log a warning.
+ if (this.snapshot.size() > 0) {
+ LOG.debug("Snapshot called again without clearing previous. " +
+ "Doing nothing. Another ongoing flush or did we fail last attempt?");
+ } else {
+ // We used to synchronize on the memcache here but we're inside a
+ // write lock so removed it. Comment is left in case removal was a
+ // mistake. St.Ack
+ if (this.memcache.size() != 0) {
+ this.snapshot = this.memcache;
+ this.memcache = createSynchronizedSortedMap();
+ }
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Return the current snapshot.
+ * Called by flusher to get current snapshot made by a previous
+ * call to {@link snapshot}.
+ * @return Return snapshot.
+ * @see {@link #snapshot()}
+ * @see {@link #clearSnapshot(SortedMap)}
+ */
+ SortedMap<HStoreKey, byte[]> getSnapshot() {
+ return this.snapshot;
+ }
+
+ /**
+ * The passed snapshot was successfully persisted; it can be let go.
+ * @param ss The snapshot to clean out.
+ * @throws UnexpectedException
+ * @see {@link #snapshot()}
+ */
+ void clearSnapshot(final SortedMap<HStoreKey, byte []> ss)
+ throws UnexpectedException {
+ this.lock.writeLock().lock();
+ try {
+ if (this.snapshot != ss) {
+ throw new UnexpectedException("Current snapshot is " +
+ this.snapshot + ", was passed " + ss);
+ }
+ // OK. Passed in snapshot is same as current snapshot. If not-empty,
+ // create a new snapshot and let the old one go.
+ if (ss.size() != 0) {
+ this.snapshot = createSynchronizedSortedMap();
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Write an update
+ * @param key
+ * @param value
+ * @return memcache size delta
+ */
+ long add(final HStoreKey key, final byte[] value) {
+ this.lock.readLock().lock();
+ try {
+ byte[] oldValue = this.memcache.remove(key);
+ this.memcache.put(key, value);
+ return key.getSize() + (value == null ? 0 : value.length) -
+ (oldValue == null ? 0 : oldValue.length);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Look back through all the backlog TreeMaps to find the target.
+ * @param key
+ * @param numVersions
+ * @return An array of byte arrays ordered by timestamp.
+ */
+ List<Cell> get(final HStoreKey key, final int numVersions) {
+ this.lock.readLock().lock();
+ try {
+ List<Cell> results;
+ // The synchronizations here are because internalGet iterates
+ synchronized (this.memcache) {
+ results = internalGet(this.memcache, key, numVersions);
+ }
+ synchronized (this.snapshot) {
+ results.addAll(results.size(),
+ internalGet(this.snapshot, key, numVersions - results.size()));
+ }
+ return results;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param a
+ * @param b
+ * @return Return lowest of a or b or null if both a and b are null
+ */
+ @SuppressWarnings("unchecked")
+ private byte [] getLowest(final byte [] a,
+ final byte [] b) {
+ if (a == null) {
+ return b;
+ }
+ if (b == null) {
+ return a;
+ }
+ return Bytes.compareTo(a, b) <= 0? a: b;
+ }
+
+ /**
+ * @param row Find the row that comes after this one.
+ * @return Next row or null if none found
+ */
+ byte [] getNextRow(final byte [] row) {
+ this.lock.readLock().lock();
+ try {
+ return getLowest(getNextRow(row, this.memcache),
+ getNextRow(row, this.snapshot));
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /*
+ * @param row Find row that follows this one.
+ * @param map Map to look in for a row beyond <code>row</code>.
+ * This method synchronizes on passed map while iterating it.
+ * @return Next row or null if none found.
+ */
+ private byte [] getNextRow(final byte [] row,
+ final SortedMap<HStoreKey, byte []> map) {
+ byte [] result = null;
+ // Synchronize on the map to make the tailMap making 'safe'.
+ synchronized (map) {
+ // Make an HSK with maximum timestamp so we get past most of the current
+ // rows cell entries.
+ HStoreKey hsk = new HStoreKey(row, HConstants.LATEST_TIMESTAMP);
+ SortedMap<HStoreKey, byte []> tailMap = map.tailMap(hsk);
+ // Iterate until we fall into the next row; i.e. move off current row
+ for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+ HStoreKey itKey = es.getKey();
+ if (Bytes.compareTo(itKey.getRow(), row) <= 0) {
+ continue;
+ }
+ // Note: Not suppressing deletes or expired cells.
+ result = itKey.getRow();
+ break;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Return all the available columns for the given key. The key indicates a
+ * row and timestamp, but not a column name.
+ * @param key
+ * @param columns Pass null for all columns else the wanted subset.
+ * @param deletes Map to accumulate deletes found.
+ * @param results Where to stick row results found.
+ */
+ void getFull(HStoreKey key, Set<byte []> columns, Map<byte [], Long> deletes,
+ Map<byte [], Cell> results) {
+ this.lock.readLock().lock();
+ try {
+ // The synchronizations here are because internalGet iterates
+ synchronized (this.memcache) {
+ internalGetFull(this.memcache, key, columns, deletes, results);
+ }
+ synchronized (this.snapshot) {
+ internalGetFull(this.snapshot, key, columns, deletes, results);
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ private void internalGetFull(SortedMap<HStoreKey, byte[]> map, HStoreKey key,
+ Set<byte []> columns, Map<byte [], Long> deletes,
+ Map<byte [], Cell> results) {
+ if (map.isEmpty() || key == null) {
+ return;
+ }
+ List<HStoreKey> victims = new ArrayList<HStoreKey>();
+ SortedMap<HStoreKey, byte[]> tailMap = map.tailMap(key);
+ long now = System.currentTimeMillis();
+ for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+ HStoreKey itKey = es.getKey();
+ byte [] itCol = itKey.getColumn();
+ if (results.get(itCol) == null && key.matchesWithoutColumn(itKey)) {
+ if (columns == null || columns.contains(itKey.getColumn())) {
+ byte [] val = tailMap.get(itKey);
+ if (HLogEdit.isDeleted(val)) {
+ if (!deletes.containsKey(itCol)
+ || deletes.get(itCol).longValue() < itKey.getTimestamp()) {
+ deletes.put(itCol, Long.valueOf(itKey.getTimestamp()));
+ }
+ } else if (!(deletes.containsKey(itCol)
+ && deletes.get(itCol).longValue() >= itKey.getTimestamp())) {
+ // Skip expired cells
+ if (ttl == HConstants.FOREVER ||
+ now < itKey.getTimestamp() + ttl) {
+ results.put(itCol, new Cell(val, itKey.getTimestamp()));
+ } else {
+ victims.add(itKey);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("internalGetFull: " + itKey + ": expired, skipped");
+ }
+ }
+ }
+ }
+ } else if (Bytes.compareTo(key.getRow(), itKey.getRow()) < 0) {
+ break;
+ }
+ }
+ // Remove expired victims from the map.
+ for (HStoreKey v: victims)
+ map.remove(v);
+ }
+
+ /**
+ * @param row Row to look for.
+ * @param candidateKeys Map of candidate keys (Accumulation over lots of
+ * lookup over stores and memcaches)
+ */
+ void getRowKeyAtOrBefore(final byte [] row,
+ SortedMap<HStoreKey, Long> candidateKeys) {
+ this.lock.readLock().lock();
+ try {
+ synchronized (memcache) {
+ internalGetRowKeyAtOrBefore(memcache, row, candidateKeys);
+ }
+ synchronized (snapshot) {
+ internalGetRowKeyAtOrBefore(snapshot, row, candidateKeys);
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ private void internalGetRowKeyAtOrBefore(SortedMap<HStoreKey, byte []> map,
+ byte [] key, SortedMap<HStoreKey, Long> candidateKeys) {
+ HStoreKey strippedKey = null;
+
+ // we want the earliest possible to start searching from
+ HStoreKey search_key = candidateKeys.isEmpty() ?
+ new HStoreKey(key) : new HStoreKey(candidateKeys.firstKey().getRow());
+ Iterator<HStoreKey> key_iterator = null;
+ HStoreKey found_key = null;
+ ArrayList<HStoreKey> victims = new ArrayList<HStoreKey>();
+ long now = System.currentTimeMillis();
+ // get all the entries that come equal or after our search key
+ SortedMap<HStoreKey, byte []> tailMap = map.tailMap(search_key);
+
+ // if there are items in the tail map, there's either a direct match to
+ // the search key, or a range of values between the first candidate key
+ // and the ultimate search key (or the end of the cache)
+ if (!tailMap.isEmpty() &&
+ Bytes.compareTo(tailMap.firstKey().getRow(), key) <= 0) {
+ key_iterator = tailMap.keySet().iterator();
+
+ // keep looking at cells as long as they are no greater than the
+ // ultimate search key and there's still records left in the map.
+ do {
+ found_key = key_iterator.next();
+ if (Bytes.compareTo(found_key.getRow(), key) <= 0) {
+ strippedKey = stripTimestamp(found_key);
+ if (HLogEdit.isDeleted(tailMap.get(found_key))) {
+ if (candidateKeys.containsKey(strippedKey)) {
+ long bestCandidateTs =
+ candidateKeys.get(strippedKey).longValue();
+ if (bestCandidateTs <= found_key.getTimestamp()) {
+ candidateKeys.remove(strippedKey);
+ }
+ }
+ } else {
+ if (ttl == HConstants.FOREVER ||
+ now < found_key.getTimestamp() + ttl) {
+ candidateKeys.put(strippedKey,
+ new Long(found_key.getTimestamp()));
+ } else {
+ victims.add(found_key);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(":" + found_key + ": expired, skipped");
+ }
+ }
+ }
+ }
+ } while (Bytes.compareTo(found_key.getRow(), key) <= 0
+ && key_iterator.hasNext());
+ } else {
+ // the tail didn't contain any keys that matched our criteria, or was
+ // empty. examine all the keys that preceed our splitting point.
+ SortedMap<HStoreKey, byte []> headMap = map.headMap(search_key);
+
+ // if we tried to create a headMap and got an empty map, then there are
+ // no keys at or before the search key, so we're done.
+ if (headMap.isEmpty()) {
+ return;
+ }
+
+ // if there aren't any candidate keys at this point, we need to search
+ // backwards until we find at least one candidate or run out of headMap.
+ if (candidateKeys.isEmpty()) {
+ HStoreKey[] cells =
+ headMap.keySet().toArray(new HStoreKey[headMap.keySet().size()]);
+
+ byte [] lastRowFound = null;
+ for(int i = cells.length - 1; i >= 0; i--) {
+ HStoreKey thisKey = cells[i];
+
+ // if the last row we found a candidate key for is different than
+ // the row of the current candidate, we can stop looking.
+ if (lastRowFound != null &&
+ !Bytes.equals(lastRowFound, thisKey.getRow())) {
+ break;
+ }
+
+ // if this isn't a delete, record it as a candidate key. also
+ // take note of the row of this candidate so that we'll know when
+ // we cross the row boundary into the previous row.
+ if (!HLogEdit.isDeleted(headMap.get(thisKey))) {
+ if (ttl == HConstants.FOREVER) {
+ lastRowFound = thisKey.getRow();
+ candidateKeys.put(stripTimestamp(thisKey),
+ new Long(thisKey.getTimestamp()));
+ } else {
+ victims.add(found_key);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("internalGetRowKeyAtOrBefore: " + found_key +
+ ": expired, skipped");
+ }
+ }
+ }
+ }
+ } else {
+ // if there are already some candidate keys, we only need to consider
+ // the very last row's worth of keys in the headMap, because any
+ // smaller acceptable candidate keys would have caused us to start
+ // our search earlier in the list, and we wouldn't be searching here.
+ SortedMap<HStoreKey, byte[]> thisRowTailMap =
+ headMap.tailMap(new HStoreKey(headMap.lastKey().getRow()));
+
+ key_iterator = thisRowTailMap.keySet().iterator();
+
+ do {
+ found_key = key_iterator.next();
+
+ if (HLogEdit.isDeleted(thisRowTailMap.get(found_key))) {
+ strippedKey = stripTimestamp(found_key);
+ if (candidateKeys.containsKey(strippedKey)) {
+ long bestCandidateTs =
+ candidateKeys.get(strippedKey).longValue();
+ if (bestCandidateTs <= found_key.getTimestamp()) {
+ candidateKeys.remove(strippedKey);
+ }
+ }
+ } else {
+ if (ttl == HConstants.FOREVER ||
+ now < found_key.getTimestamp() + ttl) {
+ candidateKeys.put(stripTimestamp(found_key),
+ Long.valueOf(found_key.getTimestamp()));
+ } else {
+ victims.add(found_key);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("internalGetRowKeyAtOrBefore: " + found_key +
+ ": expired, skipped");
+ }
+ }
+ }
+ } while (key_iterator.hasNext());
+ }
+ }
+ // Remove expired victims from the map.
+ for (HStoreKey victim: victims)
+ map.remove(victim);
+ }
+
+ static HStoreKey stripTimestamp(HStoreKey key) {
+ return new HStoreKey(key.getRow(), key.getColumn());
+ }
+
+ /**
+ * Examine a single map for the desired key.
+ *
+ * TODO - This is kinda slow. We need a data structure that allows for
+ * proximity-searches, not just precise-matches.
+ *
+ * @param map
+ * @param key
+ * @param numVersions
+ * @return Ordered list of items found in passed <code>map</code>. If no
+ * matching values, returns an empty list (does not return null).
+ */
+ private ArrayList<Cell> internalGet(
+ final SortedMap<HStoreKey, byte []> map, final HStoreKey key,
+ final int numVersions) {
+
+ ArrayList<Cell> result = new ArrayList<Cell>();
+
+ // TODO: If get is of a particular version -- numVersions == 1 -- we
+ // should be able to avoid all of the tailmap creations and iterations
+ // below.
+ long now = System.currentTimeMillis();
+ List<HStoreKey> victims = new ArrayList<HStoreKey>();
+ SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
+ for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+ HStoreKey itKey = es.getKey();
+ if (itKey.matchesRowCol(key)) {
+ if (!HLogEdit.isDeleted(es.getValue())) {
+ // Filter out expired results
+ if (ttl == HConstants.FOREVER ||
+ now < itKey.getTimestamp() + ttl) {
+ result.add(new Cell(tailMap.get(itKey), itKey.getTimestamp()));
+ if (numVersions > 0 && result.size() >= numVersions) {
+ break;
+ }
+ } else {
+ victims.add(itKey);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("internalGet: " + itKey + ": expired, skipped");
+ }
+ }
+ }
+ } else {
+ // By L.N. HBASE-684, map is sorted, so we can't find match any more.
+ break;
+ }
+ }
+ // Remove expired victims from the map.
+ for (HStoreKey v: victims) {
+ map.remove(v);
+ }
+ return result;
+ }
+
+ /**
+ * Get <code>versions</code> keys matching the origin key's
+ * row/column/timestamp and those of an older vintage
+ * Default access so can be accessed out of {@link HRegionServer}.
+ * @param origin Where to start searching.
+ * @param versions How many versions to return. Pass
+ * {@link HConstants.ALL_VERSIONS} to retrieve all.
+ * @return Ordered list of <code>versions</code> keys going from newest back.
+ * @throws IOException
+ */
+ List<HStoreKey> getKeys(final HStoreKey origin, final int versions) {
+ this.lock.readLock().lock();
+ try {
+ List<HStoreKey> results;
+ synchronized (memcache) {
+ results = internalGetKeys(this.memcache, origin, versions);
+ }
+ synchronized (snapshot) {
+ results.addAll(results.size(), internalGetKeys(snapshot, origin,
+ versions == HConstants.ALL_VERSIONS ? versions :
+ (versions - results.size())));
+ }
+ return results;
+
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /*
+ * @param origin Where to start searching.
+ * @param versions How many versions to return. Pass
+ * {@link HConstants.ALL_VERSIONS} to retrieve all.
+ * @return List of all keys that are of the same row and column and of
+ * equal or older timestamp. If no keys, returns an empty List. Does not
+ * return null.
+ */
+ private List<HStoreKey> internalGetKeys(
+ final SortedMap<HStoreKey, byte []> map, final HStoreKey origin,
+ final int versions) {
+
+ long now = System.currentTimeMillis();
+ List<HStoreKey> result = new ArrayList<HStoreKey>();
+ List<HStoreKey> victims = new ArrayList<HStoreKey>();
+ SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
+ for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+ HStoreKey key = es.getKey();
+
+ // if there's no column name, then compare rows and timestamps
+ if (origin.getColumn() != null && origin.getColumn().length == 0) {
+ // if the current and origin row don't match, then we can jump
+ // out of the loop entirely.
+ if (!Bytes.equals(key.getRow(), origin.getRow())) {
+ break;
+ }
+ // if the rows match but the timestamp is newer, skip it so we can
+ // get to the ones we actually want.
+ if (key.getTimestamp() > origin.getTimestamp()) {
+ continue;
+ }
+ }
+ else{ // compare rows and columns
+ // if the key doesn't match the row and column, then we're done, since
+ // all the cells are ordered.
+ if (!key.matchesRowCol(origin)) {
+ break;
+ }
+ }
+ if (!HLogEdit.isDeleted(es.getValue())) {
+ if (ttl == HConstants.FOREVER || now < key.getTimestamp() + ttl) {
+ result.add(key);
+ } else {
+ victims.add(key);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("internalGetKeys: " + key + ": expired, skipped");
+ }
+ }
+ if (result.size() >= versions) {
+ // We have enough results. Return.
+ break;
+ }
+ }
+ }
+
+ // Clean expired victims from the map.
+ for (HStoreKey v: victims)
+ map.remove(v);
+
+ return result;
+ }
+
+
+ /**
+ * @param key
+ * @return True if an entry and its content is {@link HGlobals.deleteBytes}.
+ * Use checking values in store. On occasion the memcache has the fact that
+ * the cell has been deleted.
+ */
+ boolean isDeleted(final HStoreKey key) {
+ return HLogEdit.isDeleted(this.memcache.get(key));
+ }
+
+ /**
+ * @return a scanner over the keys in the Memcache
+ */
+ InternalScanner getScanner(long timestamp,
+ final byte [][] targetCols, final byte [] firstRow)
+ throws IOException {
+ this.lock.readLock().lock();
+ try {
+ return new MemcacheScanner(timestamp, targetCols, firstRow);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // MemcacheScanner implements the InternalScanner.
+ // It lets the caller scan the contents of the Memcache.
+ //////////////////////////////////////////////////////////////////////////////
+
+ private class MemcacheScanner extends HAbstractScanner {
+ private byte [] currentRow;
+ private Set<byte []> columns = null;
+
+ MemcacheScanner(final long timestamp, final byte [] targetCols[],
+ final byte [] firstRow)
+ throws IOException {
+ // Call to super will create ColumnMatchers and whether this is a regex
+ // scanner or not. Will also save away timestamp. Also sorts rows.
+ super(timestamp, targetCols);
+ this.currentRow = firstRow;
+ // If we're being asked to scan explicit columns rather than all in
+ // a family or columns that match regexes, cache the sorted array of
+ // columns.
+ this.columns = null;
+ if (!isWildcardScanner()) {
+ this.columns = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+ for (int i = 0; i < targetCols.length; i++) {
+ this.columns.add(targetCols[i]);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+ throws IOException {
+ if (this.scannerClosed) {
+ return false;
+ }
+ // This is a treemap rather than a Hashmap because then I can have a
+ // byte array as key -- because I can independently specify a comparator.
+ Map<byte [], Long> deletes =
+ new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+ // Catch all row results in here. These results are ten filtered to
+ // ensure they match column name regexes, or if none, added to results.
+ Map<byte [], Cell> rowResults =
+ new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
+ if (results.size() > 0) {
+ results.clear();
+ }
+ long latestTimestamp = -1;
+ while (results.size() <= 0 && this.currentRow != null) {
+ if (deletes.size() > 0) {
+ deletes.clear();
+ }
+ if (rowResults.size() > 0) {
+ rowResults.clear();
+ }
+ key.setRow(this.currentRow);
+ key.setVersion(this.timestamp);
+ getFull(key, isWildcardScanner() ? null : this.columns, deletes,
+ rowResults);
+ for (Map.Entry<byte [], Long> e: deletes.entrySet()) {
+ rowResults.put(e.getKey(),
+ new Cell(HLogEdit.deleteBytes.get(), e.getValue().longValue()));
+ }
+ for (Map.Entry<byte [], Cell> e: rowResults.entrySet()) {
+ byte [] column = e.getKey();
+ Cell c = e.getValue();
+ if (isWildcardScanner()) {
+ // Check the results match. We only check columns, not timestamps.
+ // We presume that timestamps have been handled properly when we
+ // called getFull.
+ if (!columnMatch(column)) {
+ continue;
+ }
+ }
+ // We should never return HConstants.LATEST_TIMESTAMP as the time for
+ // the row. As a compromise, we return the largest timestamp for the
+ // entries that we find that match.
+ if (c.getTimestamp() != HConstants.LATEST_TIMESTAMP &&
+ c.getTimestamp() > latestTimestamp) {
+ latestTimestamp = c.getTimestamp();
+ }
+ results.put(column, c);
+ }
+ this.currentRow = getNextRow(this.currentRow);
+
+ }
+ // Set the timestamp to the largest one for the row if we would otherwise
+ // return HConstants.LATEST_TIMESTAMP
+ if (key.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
+ key.setVersion(latestTimestamp);
+ }
+ return results.size() > 0;
+ }
+
+ /** {@inheritDoc} */
+ public void close() {
+ if (!scannerClosed) {
+ scannerClosed = true;
+ }
+ }
+ }
+}