You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2008/02/24 01:19:44 UTC
svn commit: r630550 [4/7] - in /hadoop/hbase/trunk: bin/ conf/
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/filter/
src/java/org/apache/hadoop/hbase/generated/regionserver/ src/java/org/apa...
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=630550&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Sat Feb 23 16:19:34 2008
@@ -0,0 +1,2504 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.TextSequence;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hbase.BloomFilterDescriptor;
+import org.onelab.filter.BloomFilter;
+import org.onelab.filter.CountingBloomFilter;
+import org.onelab.filter.Filter;
+import org.onelab.filter.RetouchedBloomFilter;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+
+
+/**
+ * HStore maintains a bunch of data files. It is responsible for maintaining
+ * the memory/file hierarchy and for periodic flushes to disk and compacting
+ * edits to the file.
+ *
+ * Locking and transactions are handled at a higher level. This API should not
+ * be called directly by any writer, but rather by an HRegion manager.
+ */
+public class HStore implements HConstants {
+ static final Log LOG = LogFactory.getLog(HStore.class);
+
+ /**
+ * The Memcache holds in-memory modifications to the HRegion. This is really a
+ * wrapper around a TreeMap that helps us when staging the Memcache out to disk.
+ */
+ static class Memcache {
+
+ // Note that since these structures are always accessed with a lock held,
+ // no additional synchronization is required.
+
+ @SuppressWarnings("hiding")
+ private final SortedMap<HStoreKey, byte[]> memcache =
+ Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
+
+ volatile SortedMap<HStoreKey, byte[]> snapshot;
+
+ @SuppressWarnings("hiding")
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /**
+ * Constructor
+ */
+ Memcache() {
+ snapshot =
+ Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
+ }
+
+ /**
+ * Creates a snapshot of the current Memcache
+ */
+ void snapshot() {
+ this.lock.writeLock().lock();
+ try {
+ synchronized (memcache) {
+ if (memcache.size() != 0) {
+ snapshot.putAll(memcache);
+ memcache.clear();
+ }
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * @return memcache snapshot
+ */
+ SortedMap<HStoreKey, byte[]> getSnapshot() {
+ this.lock.writeLock().lock();
+ try {
+ SortedMap<HStoreKey, byte[]> currentSnapshot = snapshot;
+ snapshot =
+ Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
+
+ return currentSnapshot;
+
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Store a value.
+ * @param key
+ * @param value
+ */
+ void add(final HStoreKey key, final byte[] value) {
+ this.lock.readLock().lock();
+ try {
+ memcache.put(key, value);
+
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Look back through all the backlog TreeMaps to find the target.
+ * @param key
+ * @param numVersions
+ * @return An array of byte arrays ordered by timestamp.
+ */
+ List<byte[]> get(final HStoreKey key, final int numVersions) {
+ this.lock.readLock().lock();
+ try {
+ List<byte []> results;
+ synchronized (memcache) {
+ results = internalGet(memcache, key, numVersions);
+ }
+ synchronized (snapshot) {
+ results.addAll(results.size(),
+ internalGet(snapshot, key, numVersions - results.size()));
+ }
+ return results;
+
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Return all the available columns for the given key. The key indicates a
+ * row and timestamp, but not a column name.
+ *
+ * The returned object should map column names to byte arrays (byte[]).
+ * @param key
+ * @param results
+ */
+ void getFull(HStoreKey key, SortedMap<Text, byte[]> results) {
+ this.lock.readLock().lock();
+ try {
+ synchronized (memcache) {
+ internalGetFull(memcache, key, results);
+ }
+ synchronized (snapshot) {
+ internalGetFull(snapshot, key, results);
+ }
+
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ private void internalGetFull(SortedMap<HStoreKey, byte []> map, HStoreKey key,
+ SortedMap<Text, byte []> results) {
+
+ if (map.isEmpty() || key == null) {
+ return;
+ }
+
+ SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
+ for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+ HStoreKey itKey = es.getKey();
+ Text itCol = itKey.getColumn();
+ if (results.get(itCol) == null && key.matchesWithoutColumn(itKey)) {
+ byte [] val = tailMap.get(itKey);
+
+ if (!HLogEdit.isDeleted(val)) {
+ results.put(itCol, val);
+ }
+
+ } else if (key.getRow().compareTo(itKey.getRow()) < 0) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Find the key that matches <i>row</i> exactly, or the one that immediately
+ * preceeds it.
+ */
+ public Text getRowKeyAtOrBefore(final Text row, long timestamp)
+ throws IOException{
+ this.lock.readLock().lock();
+
+ Text key_memcache = null;
+ Text key_snapshot = null;
+
+ try {
+ synchronized (memcache) {
+ key_memcache = internalGetRowKeyAtOrBefore(memcache, row, timestamp);
+ }
+ synchronized (snapshot) {
+ key_snapshot = internalGetRowKeyAtOrBefore(snapshot, row, timestamp);
+ }
+
+ if (key_memcache == null && key_snapshot == null) {
+ // didn't find any candidates, return null
+ return null;
+ } else if (key_memcache == null && key_snapshot != null) {
+ return key_snapshot;
+ } else if (key_memcache != null && key_snapshot == null) {
+ return key_memcache;
+ } else {
+ // if either is a precise match, return the original row.
+ if ( (key_memcache != null && key_memcache.equals(row))
+ || (key_snapshot != null && key_snapshot.equals(row)) ) {
+ return row;
+ }
+ // no precise matches, so return the one that is closer to the search
+ // key (greatest)
+ return key_memcache.compareTo(key_snapshot) > 0 ?
+ key_memcache : key_snapshot;
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ private Text internalGetRowKeyAtOrBefore(SortedMap<HStoreKey, byte []> map,
+ Text key, long timestamp) {
+ // TODO: account for deleted cells
+
+ HStoreKey search_key = new HStoreKey(key, timestamp);
+
+ // get all the entries that come equal or after our search key
+ SortedMap<HStoreKey, byte []> tailMap = map.tailMap(search_key);
+
+ // if the first item in the tail has a matching row, then we have an
+ // exact match, and we should return that item
+ if (!tailMap.isEmpty() && tailMap.firstKey().getRow().equals(key)) {
+ // seek forward past any cells that don't fulfill the timestamp
+ // argument
+ Iterator<HStoreKey> key_iterator = tailMap.keySet().iterator();
+ HStoreKey found_key = key_iterator.next();
+
+ // keep seeking so long as we're in the same row, and the timstamp
+ // isn't as small as we'd like, and there are more cells to check
+ while (found_key.getRow().equals(key)
+ && found_key.getTimestamp() > timestamp && key_iterator.hasNext()) {
+ found_key = key_iterator.next();
+ }
+
+ // if this check fails, then we've iterated through all the keys that
+ // match by row, but none match by timestamp, so we fall through to
+ // the headMap case.
+ if (found_key.getTimestamp() <= timestamp) {
+ // we didn't find a key that matched by timestamp, so we have to
+ // return null;
+/* LOG.debug("Went searching for " + key + ", found " + found_key.getRow());*/
+ return found_key.getRow();
+ }
+ }
+
+ // the tail didn't contain the key we're searching for, so we should
+ // use the last key in the headmap as the closest before
+ SortedMap<HStoreKey, byte []> headMap = map.headMap(search_key);
+ return headMap.isEmpty()? null: headMap.lastKey().getRow();
+ }
+
+ /**
+ * Examine a single map for the desired key.
+ *
+ * TODO - This is kinda slow. We need a data structure that allows for
+ * proximity-searches, not just precise-matches.
+ *
+ * @param map
+ * @param key
+ * @param numVersions
+ * @return Ordered list of items found in passed <code>map</code>. If no
+ * matching values, returns an empty list (does not return null).
+ */
+ private ArrayList<byte []> internalGet(
+ final SortedMap<HStoreKey, byte []> map, final HStoreKey key,
+ final int numVersions) {
+
+ ArrayList<byte []> result = new ArrayList<byte []>();
+ // TODO: If get is of a particular version -- numVersions == 1 -- we
+ // should be able to avoid all of the tailmap creations and iterations
+ // below.
+ SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
+ for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+ HStoreKey itKey = es.getKey();
+ if (itKey.matchesRowCol(key)) {
+ if (!HLogEdit.isDeleted(es.getValue())) {
+ result.add(tailMap.get(itKey));
+ }
+ }
+ if (numVersions > 0 && result.size() >= numVersions) {
+ break;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Get <code>versions</code> keys matching the origin key's
+ * row/column/timestamp and those of an older vintage
+ * Default access so can be accessed out of {@link HRegionServer}.
+ * @param origin Where to start searching.
+ * @param versions How many versions to return. Pass
+ * {@link HConstants.ALL_VERSIONS} to retrieve all.
+ * @return Ordered list of <code>versions</code> keys going from newest back.
+ * @throws IOException
+ */
+ List<HStoreKey> getKeys(final HStoreKey origin, final int versions) {
+ this.lock.readLock().lock();
+ try {
+ List<HStoreKey> results;
+ synchronized (memcache) {
+ results = internalGetKeys(this.memcache, origin, versions);
+ }
+ synchronized (snapshot) {
+ results.addAll(results.size(), internalGetKeys(snapshot, origin,
+ versions == HConstants.ALL_VERSIONS ? versions :
+ (versions - results.size())));
+ }
+ return results;
+
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /*
+ * @param origin Where to start searching.
+ * @param versions How many versions to return. Pass
+ * {@link HConstants.ALL_VERSIONS} to retrieve all.
+ * @return List of all keys that are of the same row and column and of
+ * equal or older timestamp. If no keys, returns an empty List. Does not
+ * return null.
+ */
+ private List<HStoreKey> internalGetKeys(final SortedMap<HStoreKey, byte []> map,
+ final HStoreKey origin, final int versions) {
+
+ List<HStoreKey> result = new ArrayList<HStoreKey>();
+ SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
+ for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+ HStoreKey key = es.getKey();
+
+ // if there's no column name, then compare rows and timestamps
+ if (origin.getColumn().toString().equals("")) {
+ // if the current and origin row don't match, then we can jump
+ // out of the loop entirely.
+ if (!key.getRow().equals(origin.getRow())) {
+ break;
+ }
+ // if the rows match but the timestamp is newer, skip it so we can
+ // get to the ones we actually want.
+ if (key.getTimestamp() > origin.getTimestamp()) {
+ continue;
+ }
+ }
+ else{ // compare rows and columns
+ // if the key doesn't match the row and column, then we're done, since
+ // all the cells are ordered.
+ if (!key.matchesRowCol(origin)) {
+ break;
+ }
+ }
+
+ if (!HLogEdit.isDeleted(es.getValue())) {
+ result.add(key);
+ if (versions != HConstants.ALL_VERSIONS && result.size() >= versions) {
+ // We have enough results. Return.
+ break;
+ }
+ }
+ }
+ return result;
+ }
+
+
+ /**
+ * @param key
+ * @return True if an entry and its content is {@link HGlobals.deleteBytes}.
+ * Use checking values in store. On occasion the memcache has the fact that
+ * the cell has been deleted.
+ */
+ boolean isDeleted(final HStoreKey key) {
+ return HLogEdit.isDeleted(this.memcache.get(key));
+ }
+
+ /**
+ * @return a scanner over the keys in the Memcache
+ */
+ HInternalScannerInterface getScanner(long timestamp,
+ Text targetCols[], Text firstRow) throws IOException {
+
+ // Here we rely on ReentrantReadWriteLock's ability to acquire multiple
+ // locks by the same thread and to be able to downgrade a write lock to
+ // a read lock. We need to hold a lock throughout this method, but only
+ // need the write lock while creating the memcache snapshot
+
+ this.lock.writeLock().lock(); // hold write lock during memcache snapshot
+ snapshot(); // snapshot memcache
+ this.lock.readLock().lock(); // acquire read lock
+ this.lock.writeLock().unlock(); // downgrade to read lock
+ try {
+ // Prevent a cache flush while we are constructing the scanner
+
+ return new MemcacheScanner(timestamp, targetCols, firstRow);
+
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // MemcacheScanner implements the HScannerInterface.
+ // It lets the caller scan the contents of the Memcache.
+ //////////////////////////////////////////////////////////////////////////////
+
+ class MemcacheScanner extends HAbstractScanner {
+ SortedMap<HStoreKey, byte []> backingMap;
+ Iterator<HStoreKey> keyIterator;
+
+ @SuppressWarnings("unchecked")
+ MemcacheScanner(final long timestamp, final Text targetCols[],
+ final Text firstRow) throws IOException {
+
+ super(timestamp, targetCols);
+ try {
+ this.backingMap = new TreeMap<HStoreKey, byte[]>();
+ this.backingMap.putAll(snapshot);
+ this.keys = new HStoreKey[1];
+ this.vals = new byte[1][];
+
+ // Generate list of iterators
+
+ HStoreKey firstKey = new HStoreKey(firstRow);
+ if (firstRow != null && firstRow.getLength() != 0) {
+ keyIterator =
+ backingMap.tailMap(firstKey).keySet().iterator();
+
+ } else {
+ keyIterator = backingMap.keySet().iterator();
+ }
+
+ while (getNext(0)) {
+ if (!findFirstRow(0, firstRow)) {
+ continue;
+ }
+ if (columnMatch(0)) {
+ break;
+ }
+ }
+ } catch (RuntimeException ex) {
+ LOG.error("error initializing Memcache scanner: ", ex);
+ close();
+ IOException e = new IOException("error initializing Memcache scanner");
+ e.initCause(ex);
+ throw e;
+
+ } catch(IOException ex) {
+ LOG.error("error initializing Memcache scanner: ", ex);
+ close();
+ throw ex;
+ }
+ }
+
+ /**
+ * The user didn't want to start scanning at the first row. This method
+ * seeks to the requested row.
+ *
+ * @param i which iterator to advance
+ * @param firstRow seek to this row
+ * @return true if this is the first row
+ */
+ @Override
+ boolean findFirstRow(int i, Text firstRow) {
+ return firstRow.getLength() == 0 ||
+ keys[i].getRow().compareTo(firstRow) >= 0;
+ }
+
+ /**
+ * Get the next value from the specified iterator.
+ *
+ * @param i Which iterator to fetch next value from
+ * @return true if there is more data available
+ */
+ @Override
+ boolean getNext(int i) {
+ boolean result = false;
+ while (true) {
+ if (!keyIterator.hasNext()) {
+ closeSubScanner(i);
+ break;
+ }
+ // Check key is < than passed timestamp for this scanner.
+ HStoreKey hsk = keyIterator.next();
+ if (hsk == null) {
+ throw new NullPointerException("Unexpected null key");
+ }
+ if (hsk.getTimestamp() <= this.timestamp) {
+ this.keys[i] = hsk;
+ this.vals[i] = backingMap.get(keys[i]);
+ result = true;
+ break;
+ }
+ }
+ return result;
+ }
+
+ /** Shut down an individual map iterator. */
+ @Override
+ void closeSubScanner(int i) {
+ keyIterator = null;
+ keys[i] = null;
+ vals[i] = null;
+ backingMap = null;
+ }
+
+ /** Shut down map iterators */
+ public void close() {
+ if (!scannerClosed) {
+ if(keyIterator != null) {
+ closeSubScanner(0);
+ }
+ scannerClosed = true;
+ }
+ }
+ }
+ }
+
+ /*
+ * Regex that will work for straight filenames and for reference names.
+ * If reference, then the regex has more than just one group. Group 1 is
+ * this files id. Group 2 the referenced region name, etc.
+ */
+ private static Pattern REF_NAME_PARSER =
+ Pattern.compile("^(\\d+)(?:\\.(.+))?$");
+
+ private static final String BLOOMFILTER_FILE_NAME = "filter";
+
+ final Memcache memcache = new Memcache();
+ private final Path basedir;
+ private final HRegionInfo info;
+ private final HColumnDescriptor family;
+ private final SequenceFile.CompressionType compression;
+ final FileSystem fs;
+ private final HBaseConfiguration conf;
+ private final Path filterDir;
+ final Filter bloomFilter;
+ private final Path compactionDir;
+
+ private final Integer compactLock = new Integer(0);
+ private final Integer flushLock = new Integer(0);
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ final AtomicInteger activeScanners = new AtomicInteger(0);
+
+ final String storeName;
+
+ /*
+ * Sorted Map of readers keyed by sequence id (Most recent should be last in
+ * in list).
+ */
+ final SortedMap<Long, HStoreFile> storefiles =
+ Collections.synchronizedSortedMap(new TreeMap<Long, HStoreFile>());
+
+ /*
+ * Sorted Map of readers keyed by sequence id (Most recent should be last in
+ * in list).
+ */
+ private final SortedMap<Long, MapFile.Reader> readers =
+ new TreeMap<Long, MapFile.Reader>();
+
+ private volatile long maxSeqId;
+ private final int compactionThreshold;
+ private final ReentrantReadWriteLock newScannerLock =
+ new ReentrantReadWriteLock();
+
+ /**
+ * An HStore is a set of zero or more MapFiles, which stretch backwards over
+ * time. A given HStore is responsible for a certain set of columns for a
+ * row in the HRegion.
+ *
+ * <p>The HRegion starts writing to its set of HStores when the HRegion's
+ * memcache is flushed. This results in a round of new MapFiles, one for
+ * each HStore.
+ *
+ * <p>There's no reason to consider append-logging at this level; all logging
+ * and locking is handled at the HRegion level. HStore just provides
+ * services to manage sets of MapFiles. One of the most important of those
+ * services is MapFile-compaction services.
+ *
+ * <p>The only thing having to do with logs that HStore needs to deal with is
+ * the reconstructionLog. This is a segment of an HRegion's log that might
+ * NOT be present upon startup. If the param is NULL, there's nothing to do.
+ * If the param is non-NULL, we need to process the log to reconstruct
+ * a TreeMap that might not have been written to disk before the process
+ * died.
+ *
+ * <p>It's assumed that after this constructor returns, the reconstructionLog
+ * file will be deleted (by whoever has instantiated the HStore).
+ *
+ * @param basedir qualified path under which the region directory lives
+ * @param info HRegionInfo for this region
+ * @param family HColumnDescriptor for this column
+ * @param fs file system object
+ * @param reconstructionLog existing log file to apply if any
+ * @param conf configuration object
+ * @throws IOException
+ */
+ HStore(Path basedir, HRegionInfo info, HColumnDescriptor family,
+ FileSystem fs, Path reconstructionLog, HBaseConfiguration conf)
+ throws IOException {
+
+ this.basedir = basedir;
+ this.info = info;
+ this.family = family;
+ this.fs = fs;
+ this.conf = conf;
+
+ this.compactionDir = HRegion.getCompactionDir(basedir);
+ this.storeName =
+ this.info.getEncodedName() + "/" + this.family.getFamilyName();
+
+ if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
+ this.compression = SequenceFile.CompressionType.BLOCK;
+ } else if (family.getCompression() ==
+ HColumnDescriptor.CompressionType.RECORD) {
+ this.compression = SequenceFile.CompressionType.RECORD;
+ } else {
+ this.compression = SequenceFile.CompressionType.NONE;
+ }
+
+ Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(),
+ family.getFamilyName());
+ if (!fs.exists(mapdir)) {
+ fs.mkdirs(mapdir);
+ }
+ Path infodir = HStoreFile.getInfoDir(basedir, info.getEncodedName(),
+ family.getFamilyName());
+ if (!fs.exists(infodir)) {
+ fs.mkdirs(infodir);
+ }
+
+ if(family.getBloomFilter() == null) {
+ this.filterDir = null;
+ this.bloomFilter = null;
+ } else {
+ this.filterDir = HStoreFile.getFilterDir(basedir, info.getEncodedName(),
+ family.getFamilyName());
+ if (!fs.exists(filterDir)) {
+ fs.mkdirs(filterDir);
+ }
+ this.bloomFilter = loadOrCreateBloomFilter();
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("starting " + storeName +
+ ((reconstructionLog == null || !fs.exists(reconstructionLog)) ?
+ " (no reconstruction log)" :
+ " with reconstruction log: " + reconstructionLog.toString()));
+ }
+
+ // Go through the 'mapdir' and 'infodir' together, make sure that all
+ // MapFiles are in a reliable state. Every entry in 'mapdir' must have a
+ // corresponding one in 'loginfodir'. Without a corresponding log info
+ // file, the entry in 'mapdir' must be deleted.
+ List<HStoreFile> hstoreFiles = loadHStoreFiles(infodir, mapdir);
+ for(HStoreFile hsf: hstoreFiles) {
+ this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
+ }
+
+ // Now go through all the HSTORE_LOGINFOFILEs and figure out the
+ // most-recent log-seq-ID that's present. The most-recent such ID means we
+ // can ignore all log messages up to and including that ID (because they're
+ // already reflected in the TreeMaps).
+ //
+ // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That
+ // means it was built prior to the previous run of HStore, and so it cannot
+ // contain any updates also contained in the log.
+
+ this.maxSeqId = getMaxSequenceId(hstoreFiles);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("maximum sequence id for hstore " + storeName + " is " +
+ this.maxSeqId);
+ }
+
+ doReconstructionLog(reconstructionLog, maxSeqId);
+
+ // By default, we compact if an HStore has more than
+ // MIN_COMMITS_FOR_COMPACTION map files
+ this.compactionThreshold =
+ conf.getInt("hbase.hstore.compactionThreshold", 3);
+
+ // We used to compact in here before bringing the store online. Instead
+ // get it online quick even if it needs compactions so we can start
+ // taking updates as soon as possible (Once online, can take updates even
+ // during a compaction).
+
+ // Move maxSeqId on by one. Why here? And not in HRegion?
+ this.maxSeqId += 1;
+
+ // Finally, start up all the map readers! (There could be more than one
+ // since we haven't compacted yet.)
+ boolean first = true;
+ for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
+ if (first) {
+ // Use a block cache (if configured) for the first reader only
+ // so as to control memory usage.
+ this.readers.put(e.getKey(),
+ e.getValue().getReader(this.fs, this.bloomFilter,
+ family.isBlockCacheEnabled()));
+ first = false;
+ } else {
+ this.readers.put(e.getKey(),
+ e.getValue().getReader(this.fs, this.bloomFilter));
+ }
+ }
+ }
+
+ /*
+ * @param hstoreFiles
+ * @return Maximum sequence number found or -1.
+ * @throws IOException
+ */
+ private long getMaxSequenceId(final List<HStoreFile> hstoreFiles)
+ throws IOException {
+ long maxSeqID = -1;
+ for (HStoreFile hsf : hstoreFiles) {
+ long seqid = hsf.loadInfo(fs);
+ if (seqid > 0) {
+ if (seqid > maxSeqID) {
+ maxSeqID = seqid;
+ }
+ }
+ }
+ return maxSeqID;
+ }
+
+ long getMaxSequenceId() {
+ return this.maxSeqId;
+ }
+
+ /*
+ * Read the reconstructionLog to see whether we need to build a brand-new
+ * MapFile out of non-flushed log entries.
+ *
+ * We can ignore any log message that has a sequence ID that's equal to or
+ * lower than maxSeqID. (Because we know such log messages are already
+ * reflected in the MapFiles.)
+ */
+ private void doReconstructionLog(final Path reconstructionLog,
+ final long maxSeqID) throws UnsupportedEncodingException, IOException {
+
+ if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
+ // Nothing to do.
+ return;
+ }
+ long maxSeqIdInLog = -1;
+ TreeMap<HStoreKey, byte []> reconstructedCache =
+ new TreeMap<HStoreKey, byte []>();
+
+ SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
+ reconstructionLog, this.conf);
+
+ try {
+ HLogKey key = new HLogKey();
+ HLogEdit val = new HLogEdit();
+ long skippedEdits = 0;
+ while (logReader.next(key, val)) {
+ maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
+ if (key.getLogSeqNum() <= maxSeqID) {
+ skippedEdits++;
+ continue;
+ }
+ if (skippedEdits > 0 && LOG.isDebugEnabled()) {
+ LOG.debug("Skipped " + skippedEdits +
+ " edits because sequence id <= " + maxSeqID);
+ }
+ // Check this edit is for me. Also, guard against writing
+ // METACOLUMN info such as HBASE::CACHEFLUSH entries
+ Text column = val.getColumn();
+ if (column.equals(HLog.METACOLUMN)
+ || !key.getRegionName().equals(info.getRegionName())
+ || !HStoreKey.extractFamily(column).equals(family.getFamilyName())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Passing on edit " + key.getRegionName() + ", " +
+ column.toString() + ": " +
+ new String(val.getVal(), UTF8_ENCODING) +
+ ", my region: " + info.getRegionName() + ", my column: " +
+ family.getFamilyName());
+ }
+ continue;
+ }
+ HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Applying edit <" + k.toString() + "=" + val.toString() +
+ ">");
+ }
+ reconstructedCache.put(k, val.getVal());
+ }
+ } finally {
+ logReader.close();
+ }
+
+ if (reconstructedCache.size() > 0) {
+ // We create a "virtual flush" at maxSeqIdInLog+1.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("flushing reconstructionCache");
+ }
+ internalFlushCache(reconstructedCache, maxSeqIdInLog + 1);
+ }
+ }
+
+ /*
+ * Creates a series of HStoreFiles loaded from the given directory.
+ * There must be a matching 'mapdir' and 'loginfo' pair of files.
+ * If only one exists, we'll delete it.
+ *
+ * @param infodir qualified path for info file directory
+ * @param mapdir qualified path for map file directory
+ * @throws IOException
+ */
+ private List<HStoreFile> loadHStoreFiles(Path infodir, Path mapdir)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("infodir: " + infodir.toString() + " mapdir: " +
+ mapdir.toString());
+ }
+ // Look first at info files. If a reference, these contain info we need
+ // to create the HStoreFile.
+ Path infofiles[] = fs.listPaths(new Path[] {infodir});
+ ArrayList<HStoreFile> results = new ArrayList<HStoreFile>(infofiles.length);
+ ArrayList<Path> mapfiles = new ArrayList<Path>(infofiles.length);
+ for (Path p: infofiles) {
+ Matcher m = REF_NAME_PARSER.matcher(p.getName());
+ /*
+ * * * * * N O T E * * * * *
+ *
+ * We call isReference(Path, Matcher) here because it calls
+ * Matcher.matches() which must be called before Matcher.group(int)
+ * and we don't want to call Matcher.matches() twice.
+ *
+ * * * * * N O T E * * * * *
+ */
+ boolean isReference = isReference(p, m);
+ long fid = Long.parseLong(m.group(1));
+
+ HStoreFile curfile = null;
+ HStoreFile.Reference reference = null;
+ if (isReference) {
+ reference = readSplitInfo(p, fs);
+ }
+ curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(),
+ family.getFamilyName(), fid, reference);
+ Path mapfile = curfile.getMapFilePath();
+ if (!fs.exists(mapfile)) {
+ fs.delete(curfile.getInfoFilePath());
+ LOG.warn("Mapfile " + mapfile.toString() + " does not exist. " +
+ "Cleaned up info file. Continuing...");
+ continue;
+ }
+
+ // TODO: Confirm referent exists.
+
+ // Found map and sympathetic info file. Add this hstorefile to result.
+ results.add(curfile);
+ // Keep list of sympathetic data mapfiles for cleaning info dir in next
+ // section. Make sure path is fully qualified for compare.
+ mapfiles.add(mapfile);
+ }
+
+ // List paths by experience returns fully qualified names -- at least when
+ // running on a mini hdfs cluster.
+ Path datfiles[] = fs.listPaths(new Path[] {mapdir});
+ for (int i = 0; i < datfiles.length; i++) {
+ // If does not have sympathetic info file, delete.
+ if (!mapfiles.contains(fs.makeQualified(datfiles[i]))) {
+ fs.delete(datfiles[i]);
+ }
+ }
+ return results;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Bloom filters
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Called by constructor if a bloom filter is enabled for this column family.
+ * If the HStore already exists, it will read in the bloom filter saved
+ * previously. Otherwise, it will create a new bloom filter.
+ */
+ private Filter loadOrCreateBloomFilter() throws IOException {
+ Path filterFile = new Path(filterDir, BLOOMFILTER_FILE_NAME);
+ Filter bloomFilter = null;
+ if(fs.exists(filterFile)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("loading bloom filter for " + this.storeName);
+ }
+
+ BloomFilterDescriptor.BloomFilterType type =
+ family.getBloomFilter().getType();
+
+ switch(type) {
+
+ case BLOOMFILTER:
+ bloomFilter = new BloomFilter();
+ break;
+
+ case COUNTING_BLOOMFILTER:
+ bloomFilter = new CountingBloomFilter();
+ break;
+
+ case RETOUCHED_BLOOMFILTER:
+ bloomFilter = new RetouchedBloomFilter();
+ break;
+
+ default:
+ throw new IllegalArgumentException("unknown bloom filter type: " +
+ type);
+ }
+ FSDataInputStream in = fs.open(filterFile);
+ try {
+ bloomFilter.readFields(in);
+ } finally {
+ fs.close();
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("creating bloom filter for " + this.storeName);
+ }
+
+ BloomFilterDescriptor.BloomFilterType type =
+ family.getBloomFilter().getType();
+
+ switch(type) {
+
+ case BLOOMFILTER:
+ bloomFilter = new BloomFilter(family.getBloomFilter().getVectorSize(),
+ family.getBloomFilter().getNbHash());
+ break;
+
+ case COUNTING_BLOOMFILTER:
+ bloomFilter =
+ new CountingBloomFilter(family.getBloomFilter().getVectorSize(),
+ family.getBloomFilter().getNbHash());
+ break;
+
+ case RETOUCHED_BLOOMFILTER:
+ bloomFilter =
+ new RetouchedBloomFilter(family.getBloomFilter().getVectorSize(),
+ family.getBloomFilter().getNbHash());
+ }
+ }
+ return bloomFilter;
+ }
+
+ /**
+ * Flushes bloom filter to disk
+ *
+ * @throws IOException
+ */
+ private void flushBloomFilter() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("flushing bloom filter for " + this.storeName);
+ }
+ FSDataOutputStream out =
+ fs.create(new Path(filterDir, BLOOMFILTER_FILE_NAME));
+ try {
+ bloomFilter.write(out);
+ } finally {
+ out.close();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("flushed bloom filter for " + this.storeName);
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // End bloom filters
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Adds a value to the memcache
+ *
+ * @param key
+ * @param value
+ */
+ void add(HStoreKey key, byte[] value) {
+ lock.readLock().lock();
+ try {
+ this.memcache.add(key, value);
+
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Close all the MapFile readers
+ *
+ * We don't need to worry about subsequent requests because the HRegion holds
+ * a write lock that will prevent any more reads or writes.
+ *
+ * @throws IOException
+ */
+ List<HStoreFile> close() throws IOException {
+ ArrayList<HStoreFile> result = null;
+ this.lock.writeLock().lock();
+ try {
+ for (MapFile.Reader reader: this.readers.values()) {
+ reader.close();
+ }
+ this.readers.clear();
+ result = new ArrayList<HStoreFile>(storefiles.values());
+ this.storefiles.clear();
+ LOG.debug("closed " + this.storeName);
+ return result;
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Flush changes to disk
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Prior to doing a cache flush, we need to snapshot the memcache. Locking is
+ * handled by the memcache.
+ */
+ void snapshotMemcache() {
+ this.memcache.snapshot();
+ }
+
+ /**
+ * Write out a brand-new set of items to the disk.
+ *
+ * We should only store key/vals that are appropriate for the data-columns
+ * stored in this HStore.
+ *
+ * Also, we are not expecting any reads of this MapFile just yet.
+ *
+ * Return the entire list of HStoreFiles currently used by the HStore.
+ *
+ * @param logCacheFlushId flush sequence number
+ * @throws IOException
+ */
+ void flushCache(final long logCacheFlushId) throws IOException {
+ internalFlushCache(memcache.getSnapshot(), logCacheFlushId);
+ }
+
+ private void internalFlushCache(SortedMap<HStoreKey, byte []> cache,
+ long logCacheFlushId) throws IOException {
+
+ synchronized(flushLock) {
+ // A. Write the Maps out to the disk
+ HStoreFile flushedFile = new HStoreFile(conf, fs, basedir,
+ info.getEncodedName(), family.getFamilyName(), -1L, null);
+ String name = flushedFile.toString();
+ MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
+ this.bloomFilter);
+
+ // Here we tried picking up an existing HStoreFile from disk and
+ // interlacing the memcache flush compacting as we go. The notion was
+ // that interlacing would take as long as a pure flush with the added
+ // benefit of having one less file in the store. Experiments showed that
+ // it takes two to three times the amount of time flushing -- more column
+ // families makes it so the two timings come closer together -- but it
+ // also complicates the flush. The code was removed. Needed work picking
+ // which file to interlace (favor references first, etc.)
+ //
+ // Related, looks like 'merging compactions' in BigTable paper interlaces
+ // a memcache flush. We don't.
+ int entries = 0;
+ try {
+ for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
+ HStoreKey curkey = es.getKey();
+ TextSequence f = HStoreKey.extractFamily(curkey.getColumn());
+ if (f.equals(this.family.getFamilyName())) {
+ entries++;
+ out.append(curkey, new ImmutableBytesWritable(es.getValue()));
+ }
+ }
+ } finally {
+ out.close();
+ }
+
+ // B. Write out the log sequence number that corresponds to this output
+ // MapFile. The MapFile is current up to and including the log seq num.
+ flushedFile.writeInfo(fs, logCacheFlushId);
+
+ // C. Flush the bloom filter if any
+ if (bloomFilter != null) {
+ flushBloomFilter();
+ }
+
+ // D. Finally, make the new MapFile available.
+ this.lock.writeLock().lock();
+ try {
+ Long flushid = Long.valueOf(logCacheFlushId);
+ // Open the map file reader.
+ this.readers.put(flushid,
+ flushedFile.getReader(this.fs, this.bloomFilter));
+ this.storefiles.put(flushid, flushedFile);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Added " + name + " with " + entries +
+ " entries, sequence id " + logCacheFlushId + ", and size " +
+ StringUtils.humanReadableInt(flushedFile.length()) + " for " +
+ this.storeName);
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ return;
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Compaction
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * @return True if this store needs compaction.
+ */
+ boolean needsCompaction() {
+ return this.storefiles != null &&
+ (this.storefiles.size() >= this.compactionThreshold || hasReferences());
+ }
+
+ /*
+ * @return True if this store has references.
+ */
+ private boolean hasReferences() {
+ if (this.storefiles != null) {
+ for (HStoreFile hsf: this.storefiles.values()) {
+ if (hsf.isReference()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Compact the back-HStores. This method may take some time, so the calling
+ * thread must be able to block for long periods.
+ *
+ * <p>During this time, the HStore can work as usual, getting values from
+ * MapFiles and writing new MapFiles from the Memcache.
+ *
+ * Existing MapFiles are not destroyed until the new compacted TreeMap is
+ * completely written-out to disk.
+ *
+ * The compactLock prevents multiple simultaneous compactions.
+ * The structureLock prevents us from interfering with other write operations.
+ *
+ * We don't want to hold the structureLock for the whole time, as a compact()
+ * can be lengthy and we want to allow cache-flushes during this period.
+ * @throws IOException
+ *
+ * @return true if compaction completed successfully
+ */
+ boolean compact() throws IOException {
+ synchronized (compactLock) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("started compaction of " + storefiles.size() +
+ " files using " + compactionDir.toString() + " for " +
+ this.storeName);
+ }
+
+ // Storefiles are keyed by sequence id. The oldest file comes first.
+ // We need to return out of here a List that has the newest file first.
+ List<HStoreFile> filesToCompact =
+ new ArrayList<HStoreFile>(this.storefiles.values());
+ Collections.reverse(filesToCompact);
+ if (filesToCompact.size() < 1 ||
+ (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("nothing to compact for " + this.storeName);
+ }
+ return false;
+ }
+
+ if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) {
+ LOG.warn("Mkdir on " + compactionDir.toString() + " failed");
+ return false;
+ }
+
+ // Step through them, writing to the brand-new MapFile
+ HStoreFile compactedOutputFile = new HStoreFile(conf, fs,
+ this.compactionDir, info.getEncodedName(), family.getFamilyName(),
+ -1L, null);
+ MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
+ this.compression, this.bloomFilter);
+ try {
+ compactHStoreFiles(compactedOut, filesToCompact);
+ } finally {
+ compactedOut.close();
+ }
+
+ // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
+ // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps.
+ long maxId = getMaxSequenceId(filesToCompact);
+ compactedOutputFile.writeInfo(fs, maxId);
+
+ // Move the compaction into place.
+ completeCompaction(filesToCompact, compactedOutputFile);
+ return true;
+ }
+ }
+
+ /*
+ * Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
+ * We create a new set of MapFile.Reader objects so we don't screw up the
+ * caching associated with the currently-loaded ones. Our iteration-based
+ * access pattern is practically designed to ruin the cache.
+ *
+ * We work by opening a single MapFile.Reader for each file, and iterating
+ * through them in parallel. We always increment the lowest-ranked one.
+ * Updates to a single row/column will appear ranked by timestamp. This allows
+ * us to throw out deleted values or obsolete versions. @param compactedOut
+ * @param toCompactFiles @throws IOException
+ */
+ private void compactHStoreFiles(final MapFile.Writer compactedOut,
+ final List<HStoreFile> toCompactFiles) throws IOException {
+
+ int size = toCompactFiles.size();
+ CompactionReader[] rdrs = new CompactionReader[size];
+ int index = 0;
+ for (HStoreFile hsf: toCompactFiles) {
+ try {
+ rdrs[index++] =
+ new MapFileCompactionReader(hsf.getReader(fs, bloomFilter));
+ } catch (IOException e) {
+ // Add info about which file threw exception. It may not be in the
+ // exception message so output a message here where we know the
+ // culprit.
+ LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() +
+ (hsf.isReference() ? " " + hsf.getReference().toString() : "") +
+ " for " + this.storeName);
+ closeCompactionReaders(rdrs);
+ throw e;
+ }
+ }
+ try {
+ HStoreKey[] keys = new HStoreKey[rdrs.length];
+ ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length];
+ boolean[] done = new boolean[rdrs.length];
+ for(int i = 0; i < rdrs.length; i++) {
+ keys[i] = new HStoreKey();
+ vals[i] = new ImmutableBytesWritable();
+ done[i] = false;
+ }
+
+ // Now, advance through the readers in order. This will have the
+ // effect of a run-time sort of the entire dataset.
+ int numDone = 0;
+ for(int i = 0; i < rdrs.length; i++) {
+ rdrs[i].reset();
+ done[i] = ! rdrs[i].next(keys[i], vals[i]);
+ if(done[i]) {
+ numDone++;
+ }
+ }
+
+ int timesSeen = 0;
+ Text lastRow = new Text();
+ Text lastColumn = new Text();
+ // Map of a row deletes keyed by column with a list of timestamps for value
+ Map<Text, List<Long>> deletes = null;
+ while (numDone < done.length) {
+ // Find the reader with the smallest key. If two files have same key
+ // but different values -- i.e. one is delete and other is non-delete
+ // value -- we will find the first, the one that was written later and
+ // therefore the one whose value should make it out to the compacted
+ // store file.
+ int smallestKey = -1;
+ for(int i = 0; i < rdrs.length; i++) {
+ if(done[i]) {
+ continue;
+ }
+ if(smallestKey < 0) {
+ smallestKey = i;
+ } else {
+ if(keys[i].compareTo(keys[smallestKey]) < 0) {
+ smallestKey = i;
+ }
+ }
+ }
+
+ // Reflect the current key/val in the output
+ HStoreKey sk = keys[smallestKey];
+ if(lastRow.equals(sk.getRow())
+ && lastColumn.equals(sk.getColumn())) {
+ timesSeen++;
+ } else {
+ timesSeen = 1;
+ // We are on to a new row. Create a new deletes list.
+ deletes = new HashMap<Text, List<Long>>();
+ }
+
+ byte [] value = (vals[smallestKey] == null)?
+ null: vals[smallestKey].get();
+ if (!isDeleted(sk, value, false, deletes) &&
+ timesSeen <= family.getMaxVersions()) {
+ // Keep old versions until we have maxVersions worth.
+ // Then just skip them.
+ if (sk.getRow().getLength() != 0 && sk.getColumn().getLength() != 0) {
+ // Only write out objects which have a non-zero length key and
+ // value
+ compactedOut.append(sk, vals[smallestKey]);
+ }
+ }
+
+ // Update last-seen items
+ lastRow.set(sk.getRow());
+ lastColumn.set(sk.getColumn());
+
+ // Advance the smallest key. If that reader's all finished, then
+ // mark it as done.
+ if(!rdrs[smallestKey].next(keys[smallestKey],
+ vals[smallestKey])) {
+ done[smallestKey] = true;
+ rdrs[smallestKey].close();
+ rdrs[smallestKey] = null;
+ numDone++;
+ }
+ }
+ } finally {
+ closeCompactionReaders(rdrs);
+ }
+ }
+
+ private void closeCompactionReaders(final CompactionReader [] rdrs) {
+ for (int i = 0; i < rdrs.length; i++) {
+ if (rdrs[i] != null) {
+ try {
+ rdrs[i].close();
+ } catch (IOException e) {
+ LOG.warn("Exception closing reader for " + this.storeName, e);
+ }
+ }
+ }
+ }
+
+ /** Interface for generic reader for compactions */
+ interface CompactionReader {
+
+ /**
+ * Closes the reader
+ * @throws IOException
+ */
+ public void close() throws IOException;
+
+ /**
+ * Get the next key/value pair
+ *
+ * @param key
+ * @param val
+ * @return true if more data was returned
+ * @throws IOException
+ */
+ public boolean next(WritableComparable key, Writable val)
+ throws IOException;
+
+ /**
+ * Resets the reader
+ * @throws IOException
+ */
+ public void reset() throws IOException;
+ }
+
+ /** A compaction reader for MapFile */
+ static class MapFileCompactionReader implements CompactionReader {
+ final MapFile.Reader reader;
+
+ MapFileCompactionReader(final MapFile.Reader r) {
+ this.reader = r;
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ this.reader.close();
+ }
+
+ /** {@inheritDoc} */
+ public boolean next(WritableComparable key, Writable val)
+ throws IOException {
+ return this.reader.next(key, val);
+ }
+
+ /** {@inheritDoc} */
+ public void reset() throws IOException {
+ this.reader.reset();
+ }
+ }
+
+ /*
+ * Check if this is cell is deleted.
+ * If a memcache and a deletes, check key does not have an entry filled.
+ * Otherwise, check value is not the <code>HGlobals.deleteBytes</code> value.
+ * If passed value IS deleteBytes, then it is added to the passed
+ * deletes map.
+ * @param hsk
+ * @param value
+ * @param checkMemcache true if the memcache should be consulted
+ * @param deletes Map keyed by column with a value of timestamp. Can be null.
+ * If non-null and passed value is HGlobals.deleteBytes, then we add to this
+ * map.
+ * @return True if this is a deleted cell. Adds the passed deletes map if
+ * passed value is HGlobals.deleteBytes.
+ */
+ private boolean isDeleted(final HStoreKey hsk, final byte [] value,
+ final boolean checkMemcache, final Map<Text, List<Long>> deletes) {
+ if (checkMemcache && memcache.isDeleted(hsk)) {
+ return true;
+ }
+ List<Long> timestamps =
+ (deletes == null) ? null: deletes.get(hsk.getColumn());
+ if (timestamps != null &&
+ timestamps.contains(Long.valueOf(hsk.getTimestamp()))) {
+ return true;
+ }
+ if (value == null) {
+ // If a null value, shouldn't be in here. Mark it as deleted cell.
+ return true;
+ }
+ if (!HLogEdit.isDeleted(value)) {
+ return false;
+ }
+ // Cell has delete value. Save it into deletes.
+ if (deletes != null) {
+ if (timestamps == null) {
+ timestamps = new ArrayList<Long>();
+ deletes.put(hsk.getColumn(), timestamps);
+ }
+ // We know its not already in the deletes array else we'd have returned
+ // earlier so no need to test if timestamps already has this value.
+ timestamps.add(Long.valueOf(hsk.getTimestamp()));
+ }
+ return true;
+ }
+
+ /*
+ * It's assumed that the compactLock will be acquired prior to calling this
+ * method! Otherwise, it is not thread-safe!
+ *
+ * It works by processing a compaction that's been written to disk.
+ *
+ * <p>It is usually invoked at the end of a compaction, but might also be
+ * invoked at HStore startup, if the prior execution died midway through.
+ *
+ * <p>Moving the compacted TreeMap into place means:
+ * <pre>
+ * 1) Wait for active scanners to exit
+ * 2) Acquiring the write-lock
+ * 3) Figuring out what MapFiles are going to be replaced
+ * 4) Moving the new compacted MapFile into place
+ * 5) Unloading all the replaced MapFiles.
+ * 6) Deleting all the old MapFile files.
+ * 7) Loading the new TreeMap.
+ * 8) Releasing the write-lock
+ * 9) Allow new scanners to proceed.
+ * </pre>
+ *
+ * @param compactedFiles list of files that were compacted
+ * @param compactedFile HStoreFile that is the result of the compaction
+ * @throws IOException
+ */
+ private void completeCompaction(List<HStoreFile> compactedFiles,
+ HStoreFile compactedFile) throws IOException {
+
+ // 1. Wait for active scanners to exit
+
+ newScannerLock.writeLock().lock(); // prevent new scanners
+ try {
+ synchronized (activeScanners) {
+ while (activeScanners.get() != 0) {
+ try {
+ activeScanners.wait();
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+
+ // 2. Acquiring the HStore write-lock
+ this.lock.writeLock().lock();
+ }
+
+ try {
+ // 3. Moving the new MapFile into place.
+
+ HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
+ info.getEncodedName(), family.getFamilyName(), -1, null);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("moving " + compactedFile.toString() + " in " +
+ this.compactionDir.toString() + " to " +
+ finalCompactedFile.toString() + " in " + basedir.toString() +
+ " for " + this.storeName);
+ }
+ if (!compactedFile.rename(this.fs, finalCompactedFile)) {
+ LOG.error("Failed move of compacted file " +
+ finalCompactedFile.toString() + " for " + this.storeName);
+ return;
+ }
+
+ // 4. and 5. Unload all the replaced MapFiles, close and delete.
+
+ List<Long> toDelete = new ArrayList<Long>();
+ for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
+ if (!compactedFiles.contains(e.getValue())) {
+ continue;
+ }
+ Long key = e.getKey();
+ MapFile.Reader reader = this.readers.remove(key);
+ if (reader != null) {
+ reader.close();
+ }
+ toDelete.add(key);
+ }
+
+ try {
+ for (Long key: toDelete) {
+ HStoreFile hsf = this.storefiles.remove(key);
+ hsf.delete();
+ }
+
+ // 6. Loading the new TreeMap.
+ Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
+ this.readers.put(orderVal,
+ // Use a block cache (if configured) for this reader since
+ // it is the only one.
+ finalCompactedFile.getReader(this.fs, this.bloomFilter,
+ family.isBlockCacheEnabled()));
+ this.storefiles.put(orderVal, finalCompactedFile);
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.error("Failed replacing compacted files for " + this.storeName +
+ ". Compacted file is " + finalCompactedFile.toString() +
+ ". Files replaced are " + compactedFiles.toString() +
+ " some of which may have been already removed", e);
+ }
+ } finally {
+ // 7. Releasing the write-lock
+ this.lock.writeLock().unlock();
+ }
+ } finally {
+ // 8. Allow new scanners to proceed.
+ newScannerLock.writeLock().unlock();
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Accessors.
+ // (This is the only section that is directly useful!)
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Return all the available columns for the given key. The key indicates a
+ * row and timestamp, but not a column name.
+ *
+ * The returned object should map column names to byte arrays (byte[]).
+ */
+ void getFull(HStoreKey key, TreeMap<Text, byte []> results)
+ throws IOException {
+ Map<Text, List<Long>> deletes = new HashMap<Text, List<Long>>();
+
+ if (key == null) {
+ return;
+ }
+
+ this.lock.readLock().lock();
+ memcache.getFull(key, results);
+ try {
+ MapFile.Reader[] maparray = getReaders();
+ for (int i = maparray.length - 1; i >= 0; i--) {
+ MapFile.Reader map = maparray[i];
+ synchronized(map) {
+ map.reset();
+ ImmutableBytesWritable readval = new ImmutableBytesWritable();
+ HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
+ if (readkey == null) {
+ continue;
+ }
+ do {
+ Text readcol = readkey.getColumn();
+ if (results.get(readcol) == null
+ && key.matchesWithoutColumn(readkey)) {
+ if(isDeleted(readkey, readval.get(), true, deletes)) {
+ break;
+ }
+ results.put(new Text(readcol), readval.get());
+ readval = new ImmutableBytesWritable();
+ } else if(key.getRow().compareTo(readkey.getRow()) < 0) {
+ break;
+ }
+
+ } while(map.next(readkey, readval));
+ }
+ }
+
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ MapFile.Reader [] getReaders() {
+ return this.readers.values().
+ toArray(new MapFile.Reader[this.readers.size()]);
+ }
+
+ /**
+ * Get the value for the indicated HStoreKey. Grab the target value and the
+ * previous 'numVersions-1' values, as well.
+ *
+ * If 'numVersions' is negative, the method returns all available versions.
+ * @param key
+ * @param numVersions Number of versions to fetch. Must be > 0.
+ * @return values for the specified versions
+ * @throws IOException
+ */
+ byte [][] get(HStoreKey key, int numVersions) throws IOException {
+ if (numVersions <= 0) {
+ throw new IllegalArgumentException("Number of versions must be > 0");
+ }
+
+ this.lock.readLock().lock();
+ try {
+ // Check the memcache
+ List<byte[]> results = this.memcache.get(key, numVersions);
+ // If we got sufficient versions from memcache, return.
+ if (results.size() == numVersions) {
+ return ImmutableBytesWritable.toArray(results);
+ }
+
+ // Keep a list of deleted cell keys. We need this because as we go through
+ // the store files, the cell with the delete marker may be in one file and
+ // the old non-delete cell value in a later store file. If we don't keep
+ // around the fact that the cell was deleted in a newer record, we end up
+ // returning the old value if user is asking for more than one version.
+ // This List of deletes should not large since we are only keeping rows
+ // and columns that match those set on the scanner and which have delete
+ // values. If memory usage becomes an issue, could redo as bloom filter.
+ Map<Text, List<Long>> deletes = new HashMap<Text, List<Long>>();
+ // This code below is very close to the body of the getKeys method.
+ MapFile.Reader[] maparray = getReaders();
+ for(int i = maparray.length - 1; i >= 0; i--) {
+ MapFile.Reader map = maparray[i];
+ synchronized(map) {
+ map.reset();
+ ImmutableBytesWritable readval = new ImmutableBytesWritable();
+ HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
+ if (readkey == null) {
+ // map.getClosest returns null if the passed key is > than the
+ // last key in the map file. getClosest is a bit of a misnomer
+ // since it returns exact match or the next closest key AFTER not
+ // BEFORE.
+ continue;
+ }
+ if (!readkey.matchesRowCol(key)) {
+ continue;
+ }
+ if (!isDeleted(readkey, readval.get(), true, deletes)) {
+ results.add(readval.get());
+ // Perhaps only one version is wanted. I could let this
+ // test happen later in the for loop test but it would cost
+ // the allocation of an ImmutableBytesWritable.
+ if (hasEnoughVersions(numVersions, results)) {
+ break;
+ }
+ }
+ for (readval = new ImmutableBytesWritable();
+ map.next(readkey, readval) &&
+ readkey.matchesRowCol(key) &&
+ !hasEnoughVersions(numVersions, results);
+ readval = new ImmutableBytesWritable()) {
+ if (!isDeleted(readkey, readval.get(), true, deletes)) {
+ results.add(readval.get());
+ }
+ }
+ }
+ if (hasEnoughVersions(numVersions, results)) {
+ break;
+ }
+ }
+ return results.size() == 0 ?
+ null : ImmutableBytesWritable.toArray(results);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ private boolean hasEnoughVersions(final int numVersions,
+ final List<byte []> results) {
+ return numVersions > 0 && results.size() >= numVersions;
+ }
+
+ /**
+ * Get <code>versions</code> keys matching the origin key's
+ * row/column/timestamp and those of an older vintage
+ * Default access so can be accessed out of {@link HRegionServer}.
+ * @param origin Where to start searching.
+ * @param versions How many versions to return. Pass
+ * {@link HConstants.ALL_VERSIONS} to retrieve all. Versions will include
+ * size of passed <code>allKeys</code> in its count.
+ * @param allKeys List of keys prepopulated by keys we found in memcache.
+ * This method returns this passed list with all matching keys found in
+ * stores appended.
+ * @return The passed <code>allKeys</code> with <code>versions</code> of
+ * matching keys found in store files appended.
+ * @throws IOException
+ */
+ List<HStoreKey> getKeys(final HStoreKey origin, final int versions)
+ throws IOException {
+
+ List<HStoreKey> keys = this.memcache.getKeys(origin, versions);
+ if (versions != ALL_VERSIONS && keys.size() >= versions) {
+ return keys;
+ }
+
+ // This code below is very close to the body of the get method.
+ this.lock.readLock().lock();
+ try {
+ MapFile.Reader[] maparray = getReaders();
+ for(int i = maparray.length - 1; i >= 0; i--) {
+ MapFile.Reader map = maparray[i];
+ synchronized(map) {
+ map.reset();
+
+ // do the priming read
+ ImmutableBytesWritable readval = new ImmutableBytesWritable();
+ HStoreKey readkey = (HStoreKey)map.getClosest(origin, readval);
+ if (readkey == null) {
+ // map.getClosest returns null if the passed key is > than the
+ // last key in the map file. getClosest is a bit of a misnomer
+ // since it returns exact match or the next closest key AFTER not
+ // BEFORE.
+ continue;
+ }
+
+ do{
+ // if the row matches, we might want this one.
+ if(rowMatches(origin, readkey)){
+ // if the cell matches, then we definitely want this key.
+ if (cellMatches(origin, readkey)) {
+ // store the key if it isn't deleted or superceeded by what's
+ // in the memcache
+ if (!isDeleted(readkey, readval.get(), false, null) &&
+ !keys.contains(readkey)) {
+ keys.add(new HStoreKey(readkey));
+
+ // if we've collected enough versions, then exit the loop.
+ if (versions != ALL_VERSIONS && keys.size() >= versions) {
+ break;
+ }
+ }
+ } else {
+ // the cell doesn't match, but there might be more with different
+ // timestamps, so move to the next key
+ continue;
+ }
+ } else{
+ // the row doesn't match, so we've gone too far.
+ break;
+ }
+ }while(map.next(readkey, readval)); // advance to the next key
+ }
+ }
+
+ return keys;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Find the key that matches <i>row</i> exactly, or the one that immediately
+ * preceeds it.
+ */
+ public Text getRowKeyAtOrBefore(final Text row, final long timestamp)
+ throws IOException{
+ // if the exact key is found, return that key
+ // if we find a key that is greater than our search key, then use the
+ // last key we processed, and if that was null, return null.
+
+ Text foundKey = memcache.getRowKeyAtOrBefore(row, timestamp);
+ if (foundKey != null) {
+ return foundKey;
+ }
+
+ // obtain read lock
+ this.lock.readLock().lock();
+ try {
+ MapFile.Reader[] maparray = getReaders();
+
+ Text bestSoFar = null;
+
+ // process each store file
+ for(int i = maparray.length - 1; i >= 0; i--) {
+ Text row_from_mapfile =
+ rowAtOrBeforeFromMapFile(maparray[i], row, timestamp);
+
+ // if the result from the mapfile is null, then we know that
+ // the mapfile was empty and can move on to the next one.
+ if (row_from_mapfile == null) {
+ continue;
+ }
+
+ // short circuit on an exact match
+ if (row.equals(row_from_mapfile)) {
+ return row;
+ }
+
+ // check to see if we've found a new closest row key as a result
+ if (bestSoFar == null || bestSoFar.compareTo(row_from_mapfile) < 0) {
+ bestSoFar = row_from_mapfile;
+ }
+ }
+
+ return bestSoFar;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Check an individual MapFile for the row at or before a given key
+ * and timestamp
+ */
+ private Text rowAtOrBeforeFromMapFile(MapFile.Reader map, Text row,
+ long timestamp)
+ throws IOException {
+ HStoreKey searchKey = new HStoreKey(row, timestamp);
+ Text previousRow = null;
+ ImmutableBytesWritable readval = new ImmutableBytesWritable();
+ HStoreKey readkey = new HStoreKey();
+
+ synchronized(map) {
+ // don't bother with the rest of this if the file is empty
+ map.reset();
+ if (!map.next(readkey, readval)) {
+ return null;
+ }
+
+ HStoreKey finalKey = new HStoreKey();
+ map.finalKey(finalKey);
+ if (finalKey.getRow().compareTo(row) < 0) {
+ return finalKey.getRow();
+ }
+
+ // seek to the exact row, or the one that would be immediately before it
+ readkey = (HStoreKey)map.getClosest(searchKey, readval, true);
+
+ if (readkey == null) {
+ // didn't find anything that would match, so returns
+ return null;
+ }
+
+ do {
+ if (readkey.getRow().compareTo(row) == 0) {
+ // exact match on row
+ if (readkey.getTimestamp() <= timestamp) {
+ // timestamp fits, return this key
+ return readkey.getRow();
+ }
+ // getting here means that we matched the row, but the timestamp
+ // is too recent - hopefully one of the next cells will match
+ // better, so keep rolling
+ continue;
+ } else if (readkey.getRow().toString().compareTo(row.toString()) > 0 ) {
+ // if the row key we just read is beyond the key we're searching for,
+ // then we're done; return the last key we saw before this one
+ return previousRow;
+ } else {
+ // so, the row key doesn't match, and we haven't gone past the row
+ // we're seeking yet, so this row is a candidate for closest, as
+ // long as the timestamp is correct.
+ if (readkey.getTimestamp() <= timestamp) {
+ previousRow = new Text(readkey.getRow());
+ }
+ // otherwise, ignore this key, because it doesn't fulfill our
+ // requirements.
+ }
+ } while(map.next(readkey, readval));
+ }
+ // getting here means we exhausted all of the cells in the mapfile.
+ // whatever satisfying row we reached previously is the row we should
+ // return
+ return previousRow;
+ }
+
+ /**
+ * Test that the <i>target</i> matches the <i>origin</i>. If the
+ * <i>origin</i> has an empty column, then it's assumed to mean any column
+ * matches and only match on row and timestamp. Otherwise, it compares the
+ * keys with HStoreKey.matchesRowCol().
+ * @param origin The key we're testing against
+ * @param target The key we're testing
+ */
+ private boolean cellMatches(HStoreKey origin, HStoreKey target){
+ // if the origin's column is empty, then we're matching any column
+ if (origin.getColumn().equals(new Text())){
+ // if the row matches, then...
+ if (target.getRow().equals(origin.getRow())) {
+ // check the timestamp
+ return target.getTimestamp() <= origin.getTimestamp();
+ }
+ return false;
+ }
+ // otherwise, we want to match on row and column
+ return target.matchesRowCol(origin);
+ }
+
+ /**
+ * Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
+ * has an empty column, then it just tests row equivalence. Otherwise, it uses
+ * HStoreKey.matchesRowCol().
+ * @param origin Key we're testing against
+ * @param target Key we're testing
+ */
+ private boolean rowMatches(HStoreKey origin, HStoreKey target){
+ // if the origin's column is empty, then we're matching any column
+ if (origin.getColumn().equals(new Text())){
+ // if the row matches, then...
+ return target.getRow().equals(origin.getRow());
+ }
+ // otherwise, we want to match on row and column
+ return target.matchesRowCol(origin);
+ }
+
+ /**
+ * Gets size for the store.
+ *
+ * @param midKey Gets set to the middle key of the largest splitable store
+ * file or its set to empty if largest is not splitable.
+ * @return Sizes for the store and the passed <code>midKey</code> is
+ * set to midKey of largest splitable. Otherwise, its set to empty
+ * to indicate we couldn't find a midkey to split on
+ */
+ HStoreSize size(Text midKey) {
+ long maxSize = 0L;
+ long aggregateSize = 0L;
+ // Not splitable if we find a reference store file present in the store.
+ boolean splitable = true;
+ if (this.storefiles.size() <= 0) {
+ return new HStoreSize(0, 0, splitable);
+ }
+
+ this.lock.readLock().lock();
+ try {
+ Long mapIndex = Long.valueOf(0L);
+ // Iterate through all the MapFiles
+ for (Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
+ HStoreFile curHSF = e.getValue();
+ long size = curHSF.length();
+ aggregateSize += size;
+ if (maxSize == 0L || size > maxSize) {
+ // This is the largest one so far
+ maxSize = size;
+ mapIndex = e.getKey();
+ }
+ if (splitable) {
+ splitable = !curHSF.isReference();
+ }
+ }
+ if (splitable) {
+ MapFile.Reader r = this.readers.get(mapIndex);
+ // seek back to the beginning of mapfile
+ r.reset();
+ // get the first and last keys
+ HStoreKey firstKey = new HStoreKey();
+ HStoreKey lastKey = new HStoreKey();
+ Writable value = new ImmutableBytesWritable();
+ r.next(firstKey, value);
+ r.finalKey(lastKey);
+ // get the midkey
+ HStoreKey mk = (HStoreKey)r.midKey();
+ if (mk != null) {
+ // if the midkey is the same as the first and last keys, then we cannot
+ // (ever) split this region.
+ if (mk.getRow().equals(firstKey.getRow()) &&
+ mk.getRow().equals(lastKey.getRow())) {
+ return new HStoreSize(aggregateSize, maxSize, false);
+ }
+ // Otherwise, set midKey
+ midKey.set(mk.getRow());
+ }
+ }
+ } catch(IOException e) {
+ LOG.warn("Failed getting store size for " + this.storeName, e);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ return new HStoreSize(aggregateSize, maxSize, splitable);
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // File administration
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Return a scanner for both the memcache and the HStore files
+ */
+ HInternalScannerInterface getScanner(long timestamp, Text targetCols[],
+ Text firstRow, RowFilterInterface filter) throws IOException {
+
+ newScannerLock.readLock().lock(); // ability to create a new
+ // scanner during a compaction
+ try {
+ lock.readLock().lock(); // lock HStore
+ try {
+ return new HStoreScanner(targetCols, firstRow, timestamp, filter);
+
+ } finally {
+ lock.readLock().unlock();
+ }
+ } finally {
+ newScannerLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return this.storeName;
+ }
+
+ /*
+ * @see writeSplitInfo(Path p, HStoreFile hsf, FileSystem fs)
+ */
+ static HStoreFile.Reference readSplitInfo(final Path p, final FileSystem fs)
+ throws IOException {
+ FSDataInputStream in = fs.open(p);
+ try {
+ HStoreFile.Reference r = new HStoreFile.Reference();
+ r.readFields(in);
+ return r;
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * @param p Path to check.
+ * @return True if the path has format of a HStoreFile reference.
+ */
+ public static boolean isReference(final Path p) {
+ return isReference(p, REF_NAME_PARSER.matcher(p.getName()));
+ }
+
+ private static boolean isReference(final Path p, final Matcher m) {
+ if (m == null || !m.matches()) {
+ LOG.warn("Failed match of store file name " + p.toString());
+ throw new RuntimeException("Failed match of store file name " +
+ p.toString());
+ }
+ return m.groupCount() > 1 && m.group(2) != null;
+ }
+
+ /**
+ * A scanner that iterates through the HStore files
+ */
+ private class StoreFileScanner extends HAbstractScanner {
+ @SuppressWarnings("hiding")
+ private MapFile.Reader[] readers;
+
+ StoreFileScanner(long timestamp, Text[] targetCols, Text firstRow)
+ throws IOException {
+ super(timestamp, targetCols);
+ try {
+ this.readers = new MapFile.Reader[storefiles.size()];
+
+ // Most recent map file should be first
+ int i = readers.length - 1;
+ for(HStoreFile curHSF: storefiles.values()) {
+ readers[i--] = curHSF.getReader(fs, bloomFilter);
+ }
+
+ this.keys = new HStoreKey[readers.length];
+ this.vals = new byte[readers.length][];
+
+ // Advance the readers to the first pos.
+ for(i = 0; i < readers.length; i++) {
+ keys[i] = new HStoreKey();
+
+ if(firstRow.getLength() != 0) {
+ if(findFirstRow(i, firstRow)) {
+ continue;
+ }
+ }
+
+ while(getNext(i)) {
+ if(columnMatch(i)) {
+ break;
+ }
+ }
+ }
+
+ } catch (Exception ex) {
+ close();
+ IOException e = new IOException("HStoreScanner failed construction");
+ e.initCause(ex);
+ throw e;
+ }
+ }
+
+ /**
+ * The user didn't want to start scanning at the first row. This method
+ * seeks to the requested row.
+ *
+ * @param i - which iterator to advance
+ * @param firstRow - seek to this row
+ * @return - true if this is the first row or if the row was not found
+ */
+ @Override
+ boolean findFirstRow(int i, Text firstRow) throws IOException {
+ ImmutableBytesWritable ibw = new ImmutableBytesWritable();
+ HStoreKey firstKey
+ = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), ibw);
+ if (firstKey == null) {
+ // Didn't find it. Close the scanner and return TRUE
+ closeSubScanner(i);
+ return true;
+ }
+ this.vals[i] = ibw.get();
+ keys[i].setRow(firstKey.getRow());
+ keys[i].setColumn(firstKey.getColumn());
+ keys[i].setVersion(firstKey.getTimestamp());
+ return columnMatch(i);
+ }
+
+ /**
+ * Get the next value from the specified reader.
+ *
+ * @param i - which reader to fetch next value from
+ * @return - true if there is more data available
+ */
+ @Override
+ boolean getNext(int i) throws IOException {
+ boolean result = false;
+ ImmutableBytesWritable ibw = new ImmutableBytesWritable();
+ while (true) {
+ if (!readers[i].next(keys[i], ibw)) {
+ closeSubScanner(i);
+ break;
+ }
+ if (keys[i].getTimestamp() <= this.timestamp) {
+ vals[i] = ibw.get();
+ result = true;
+ break;
+ }
+ }
+ return result;
+ }
+
+ /** Close down the indicated reader. */
+ @Override
+ void closeSubScanner(int i) {
+ try {
+ if(readers[i] != null) {
+ try {
+ readers[i].close();
+ } catch(IOException e) {
+ LOG.error(storeName + " closing sub-scanner", e);
+ }
+ }
+
+ } finally {
+ readers[i] = null;
+ keys[i] = null;
+ vals[i] = null;
+ }
+ }
+
+ /** Shut it down! */
+ public void close() {
+ if(! scannerClosed) {
+ try {
+ for(int i = 0; i < readers.length; i++) {
+ if(readers[i] != null) {
+ try {
+ readers[i].close();
+ } catch(IOException e) {
+ LOG.error(storeName + " closing scanner", e);
+ }
+ }
+ }
+
+ } finally {
+ scannerClosed = true;
+ }
+ }
+ }
+ }
+
+ /**
+ * Scanner scans both the memcache and the HStore
+ */
+ private class HStoreScanner implements HInternalScannerInterface {
+ private HInternalScannerInterface[] scanners;
+ private TreeMap<Text, byte []>[] resultSets;
+ private HStoreKey[] keys;
+ private boolean wildcardMatch = false;
+ private boolean multipleMatchers = false;
+ private RowFilterInterface dataFilter;
+
+ /** Create an Scanner with a handle on the memcache and HStore files. */
+ @SuppressWarnings("unchecked")
+ HStoreScanner(Text[] targetCols, Text firstRow, long timestamp,
+ RowFilterInterface filter) throws IOException {
+
+ this.dataFilter = filter;
+ if (null != dataFilter) {
+ dataFilter.reset();
+ }
+ this.scanners = new HInternalScannerInterface[2];
+ this.resultSets = new TreeMap[scanners.length];
+ this.keys = new HStoreKey[scanners.length];
+
+ try {
+ scanners[0] = memcache.getScanner(timestamp, targetCols, firstRow);
+ scanners[1] = new StoreFileScanner(timestamp, targetCols, firstRow);
+
+ for (int i = 0; i < scanners.length; i++) {
+ if (scanners[i].isWildcardScanner()) {
+ this.wildcardMatch = true;
+ }
+ if (scanners[i].isMultipleMatchScanner()) {
+ this.multipleMatchers = true;
+ }
+ }
+
+ } catch(IOException e) {
+ for (int i = 0; i < this.scanners.length; i++) {
+ if(scanners[i] != null) {
+ closeScanner(i);
+ }
+ }
+ throw e;
+ }
+
+ // Advance to the first key in each scanner.
+ // All results will match the required column-set and scanTime.
+
+ for (int i = 0; i < scanners.length; i++) {
+ keys[i] = new HStoreKey();
+ resultSets[i] = new TreeMap<Text, byte []>();
+ if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
+ closeScanner(i);
+ }
+ }
+ // As we have now successfully completed initialization, increment the
+ // activeScanner count.
+ activeScanners.incrementAndGet();
+ }
+
+ /** @return true if the scanner is a wild card scanner */
+ public boolean isWildcardScanner() {
+ return wildcardMatch;
+ }
+
+ /** @return true if the scanner is a multiple match scanner */
+ public boolean isMultipleMatchScanner() {
+ return multipleMatchers;
+ }
+
+ /** {@inheritDoc} */
+ public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
+ throws IOException {
+
+ // Filtered flag is set by filters. If a cell has been 'filtered out'
+ // -- i.e. it is not to be returned to the caller -- the flag is 'true'.
+ boolean filtered = true;
+ boolean moreToFollow = true;
+ while (filtered && moreToFollow) {
+ // Find the lowest-possible key.
+ Text chosenRow = null;
+ long chosenTimestamp = -1;
+ for (int i = 0; i < this.keys.length; i++) {
+ if (scanners[i] != null &&
+ (chosenRow == null ||
+ (keys[i].getRow().compareTo(chosenRow) < 0) ||
+ ((keys[i].getRow().compareTo(chosenRow) == 0) &&
+ (keys[i].getTimestamp() > chosenTimestamp)))) {
+ chosenRow = new Text(keys[i].getRow());
+ chosenTimestamp = keys[i].getTimestamp();
+ }
+ }
+
+ // Filter whole row by row key?
+ filtered = dataFilter != null? dataFilter.filter(chosenRow) : false;
+
+ // Store the key and results for each sub-scanner. Merge them as
+ // appropriate.
+ if (chosenTimestamp >= 0 && !filtered) {
+ // Here we are setting the passed in key with current row+timestamp
+ key.setRow(chosenRow);
+ key.setVersion(chosenTimestamp);
+ key.setColumn(HConstants.EMPTY_TEXT);
+ // Keep list of deleted cell keys within this row. We need this
+ // because as we go through scanners, the delete record may be in an
+ // early scanner and then the same record with a non-delete, non-null
+ // value in a later. Without history of what we've seen, we'll return
+ // deleted values. This List should not ever grow too large since we
+ // are only keeping rows and columns that match those set on the
+ // scanner and which have delete values. If memory usage becomes a
+ // problem, could redo as bloom filter.
+ List<HStoreKey> deletes = new ArrayList<HStoreKey>();
+ for (int i = 0; i < scanners.length && !filtered; i++) {
+ while ((scanners[i] != null
+ && !filtered
+ && moreToFollow)
+ && (keys[i].getRow().compareTo(chosenRow) == 0)) {
+ // If we are doing a wild card match or there are multiple
+ // matchers per column, we need to scan all the older versions of
+ // this row to pick up the rest of the family members
+ if (!wildcardMatch
+ && !multipleMatchers
+ && (keys[i].getTimestamp() != chosenTimestamp)) {
+ break;
+ }
+
+ // Filter out null criteria columns that are not null
+ if (dataFilter != null) {
+ filtered = dataFilter.filterNotNull(resultSets[i]);
+ }
+
+ // NOTE: We used to do results.putAll(resultSets[i]);
+ // but this had the effect of overwriting newer
+ // values with older ones. So now we only insert
+ // a result if the map does not contain the key.
+ HStoreKey hsk = new HStoreKey(key.getRow(), EMPTY_TEXT,
+ key.getTimestamp());
+ for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
+ hsk.setColumn(e.getKey());
+ if (HLogEdit.isDeleted(e.getValue())) {
+ if (!deletes.contains(hsk)) {
+ // Key changes as we cycle the for loop so add a copy to
+ // the set of deletes.
+ deletes.add(new HStoreKey(hsk));
+ }
+ } else if (!deletes.contains(hsk) &&
+ !filtered &&
+ moreToFollow &&
+ !results.containsKey(e.getKey())) {
+ if (dataFilter != null) {
+ // Filter whole row by column data?
+ filtered =
+ dataFilter.filter(chosenRow, e.getKey(), e.getValue());
+ if (filtered) {
+ results.clear();
+ break;
+ }
+ }
+ results.put(e.getKey(), e.getValue());
+ }
+ }
+ resultSets[i].clear();
+ if (!scanners[i].next(keys[i], resultSets[i])) {
+ closeScanner(i);
+ }
+ }
+ }
+ }
+
+ for (int i = 0; i < scanners.length; i++) {
+ // If the current scanner is non-null AND has a lower-or-equal
+ // row label, then its timestamp is bad. We need to advance it.
+ while ((scanners[i] != null) &&
+ (keys[i].getRow().compareTo(chosenRow) <= 0)) {
+ resultSets[i].clear();
+ if (!scanners[i].next(keys[i], resultSets[i])) {
+ closeScanner(i);
+ }
+ }
+ }
+
+ moreToFollow = chosenTimestamp >= 0;
+
+ if (dataFilter != null) {
+ if (moreToFollow) {
+ dataFilter.rowProcessed(filtered, chosenRow);
+ }
+ if (dataFilter.filterAllRemaining()) {
+ moreToFollow = false;
+ }
+ }
+
+ if (results.size() <= 0 && !filtered) {
+ // There were no results found for this row. Marked it as
+ // 'filtered'-out otherwise we will not move on to the next row.
+ filtered = true;
+ }
+ }
+
+ // If we got no results, then there is no more to follow.
+ if (results == null || results.size() <= 0) {
+ moreToFollow = false;
+ }
+
+ // Make sure scanners closed if no more results
+ if (!moreToFollow) {
+ for (int i = 0; i < scanners.length; i++) {
+ if (null != scanners[i]) {
+ closeScanner(i);
+ }
+ }
+ }
+
+ return moreToFollow;
+ }
+
+
+ /** Shut down a single scanner */
+ void closeScanner(int i) {
+ try {
+ try {
+ scanners[i].close();
+ } catch (IOException e) {
+ LOG.warn(storeName + " failed closing scanner " + i, e);
+ }
+ } finally {
+ scanners[i] = null;
+ keys[i] = null;
+ resultSets[i] = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void close() {
+ try {
+ for(int i = 0; i < scanners.length; i++) {
+ if(scanners[i] != null) {
+ closeScanner(i);
+ }
+ }
+ } finally {
+ synchronized (activeScanners) {
+ int numberOfScanners = activeScanners.decrementAndGet();
+ if (numberOfScanners < 0) {
+ LOG.error(storeName +
+ " number of active scanners less than zero: " +
+ numberOfScanners + " resetting to zero");
+ activeScanners.set(0);
+ numberOfScanners = 0;
+ }
+ if (numberOfScanners == 0) {
+ activeScanners.notifyAll();
+ }
+ }
+
+ }
+ }
+
+ /** {@inheritDoc} */
[... 7 lines stripped ...]