You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/08/08 22:30:20 UTC
svn commit: r564012 [2/4] - in /lucene/hadoop/trunk/src/contrib/hbase: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/util/
src/test/org/apache/hadoop/hbase/
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=564012&r1=564011&r2=564012
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Wed Aug 8 13:30:13 2007
@@ -36,8 +36,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;
@@ -68,7 +69,7 @@
* each column family. (This config info will be communicated via the
* tabledesc.)
*
- * The HTableDescriptor contains metainfo about the HRegion's table.
+ * <p>The HTableDescriptor contains metainfo about the HRegion's table.
* regionName is a unique identifier for this HRegion. (startKey, endKey]
* defines the keyspace for this HRegion.
*/
@@ -76,24 +77,12 @@
static String SPLITDIR = "splits";
static String MERGEDIR = "merges";
static String TMPREGION_PREFIX = "tmpregion_";
- static Random rand = new Random();
+ static final Random rand = new Random();
static final Log LOG = LogFactory.getLog(HRegion.class);
final AtomicBoolean closed = new AtomicBoolean(false);
private long noFlushCount = 0;
-
- /**
- * Deletes all the files for a HRegion
- *
- * @param fs - the file system object
- * @param baseDirectory - base directory for HBase
- * @param regionName - name of the region to delete
- * @throws IOException
- */
- static void deleteRegion(FileSystem fs, Path baseDirectory,
- Text regionName) throws IOException {
- LOG.info("Deleting region " + regionName);
- fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
- }
+ static final Text COL_SPLITA = new Text(COLUMN_FAMILY_STR + "splitA");
+ static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB");
/**
* Merge two HRegions. They must be available on the current
@@ -108,7 +97,7 @@
// Make sure that srcA comes first; important for key-ordering during
// write of the merged file.
-
+ FileSystem fs = srcA.getFilesystem();
if(srcA.getStartKey() == null) {
if(srcB.getStartKey() == null) {
throw new IOException("Cannot merge two regions with null start key");
@@ -126,7 +115,6 @@
throw new IOException("Cannot merge non-adjacent regions");
}
- FileSystem fs = a.getFilesystem();
Configuration conf = a.getConf();
HTableDescriptor tabledesc = a.getTableDesc();
HLog log = a.getLog();
@@ -143,22 +131,21 @@
HRegionInfo newRegionInfo
= new HRegionInfo(Math.abs(rand.nextLong()), tabledesc, startKey, endKey);
- Path newRegionDir = HStoreFile.getHRegionDir(merges, newRegionInfo.regionName);
+ Path newRegionDir = HRegion.getRegionDir(merges, newRegionInfo.regionName);
if(fs.exists(newRegionDir)) {
throw new IOException("Cannot merge; target file collision at " + newRegionDir);
}
- LOG.info("starting merge of regions: " + a.getRegionName() + " and "
- + b.getRegionName() + " new region start key is '"
- + (startKey == null ? "" : startKey) + "', end key is '"
- + (endKey == null ? "" : endKey) + "'");
+ LOG.info("starting merge of regions: " + a.getRegionName() + " and " +
+ b.getRegionName() + " into new region " + newRegionInfo.toString());
// Flush each of the sources, and merge their files into a single
// target for each column family.
TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
TreeMap<Text, Vector<HStoreFile>> filesToMerge =
new TreeMap<Text, Vector<HStoreFile>>();
+
for(HStoreFile src: a.flushcache(true)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
@@ -185,7 +172,7 @@
Text colFamily = es.getKey();
Vector<HStoreFile> srcFiles = es.getValue();
HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
- colFamily, Math.abs(rand.nextLong()));
+ colFamily, Math.abs(rand.nextLong()));
dst.mergeStoreFiles(srcFiles, fs, conf);
alreadyMerged.addAll(srcFiles);
}
@@ -193,7 +180,6 @@
// That should have taken care of the bulk of the data.
// Now close the source HRegions for good, and repeat the above to take care
// of any last-minute inserts
-
if(LOG.isDebugEnabled()) {
LOG.debug("flushing changes since start of merge for region "
+ a.getRegionName());
@@ -235,13 +221,13 @@
for (Map.Entry<Text, Vector<HStoreFile>> es : filesToMerge.entrySet()) {
Text colFamily = es.getKey();
Vector<HStoreFile> srcFiles = es.getValue();
- HStoreFile dst = new HStoreFile(conf, merges,
- newRegionInfo.regionName, colFamily, Math.abs(rand.nextLong()));
+ HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
+ colFamily, Math.abs(rand.nextLong()));
dst.mergeStoreFiles(srcFiles, fs, conf);
}
// Done
-
+ // Construction moves the merge files into place under region.
HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo,
newRegionDir);
@@ -304,7 +290,6 @@
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
- *
* @param rootDir root directory for HBase instance
* @param fs is the filesystem.
* @param conf is global configuration settings.
@@ -324,29 +309,27 @@
this.conf = conf;
this.regionInfo = regionInfo;
this.memcache = new HMemcache();
-
-
this.writestate.writesOngoing = true;
this.writestate.writesEnabled = true;
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
-
- this.regiondir = HStoreFile.getHRegionDir(rootDir, this.regionInfo.regionName);
+ this.regiondir = HRegion.getRegionDir(rootDir, this.regionInfo.regionName);
Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
- // Move prefab HStore files into place (if any)
-
+ // Move prefab HStore files into place (if any). This picks up split files
+ // and any merges from splits and merges dirs.
if(initialFiles != null && fs.exists(initialFiles)) {
- fs.rename(initialFiles, regiondir);
+ fs.rename(initialFiles, this.regiondir);
}
// Load in all the HStores.
for(Map.Entry<Text, HColumnDescriptor> e :
this.regionInfo.tableDesc.families().entrySet()) {
Text colFamily = HStoreKey.extractFamily(e.getKey());
- stores.put(colFamily, new HStore(rootDir, this.regionInfo.regionName,
- e.getValue(), fs, oldLogFile, conf));
+ stores.put(colFamily,
+ new HStore(rootDir, this.regionInfo.regionName, e.getValue(), fs,
+ oldLogFile, conf));
}
// Get rid of any splits or merges that were lost in-progress
@@ -359,7 +342,7 @@
fs.delete(merges);
}
- // By default, we flush the cache when 32M.
+ // By default, we flush the cache when 16M.
this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
1024*1024*16);
this.blockingMemcacheSize = this.memcacheFlushSize *
@@ -367,8 +350,8 @@
// By default, we compact the region if an HStore has more than
// MIN_COMMITS_FOR_COMPACTION map files
- this.compactionThreshold = conf.getInt("hbase.hregion.compactionThreshold",
- 3);
+ this.compactionThreshold =
+ conf.getInt("hbase.hregion.compactionThreshold", 3);
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
this.desiredMaxFileSize =
@@ -397,9 +380,8 @@
* time-sensitive thread.
*
* @return Vector of all the storage files that the HRegion's component
- * HStores make use of. It's a list of HStoreFile objects. Returns empty
- * vector if already closed and null if it is judged that it should not
- * close.
+ * HStores make use of. It's a list of all HStoreFile objects. Returns empty
+ * vector if already closed and null if judged that it should not close.
*
* @throws IOException
*/
@@ -443,7 +425,6 @@
if(!shouldClose) {
return null;
}
- LOG.info("closing region " + this.regionInfo.regionName);
// Write lock means no more row locks can be given out. Wait on
// outstanding row locks to come in before we close so we do not drop
@@ -465,124 +446,121 @@
writestate.writesOngoing = false;
}
this.closed.set(true);
- LOG.info("region " + this.regionInfo.regionName + " closed");
+ LOG.info("closed " + this.regionInfo.regionName);
}
} finally {
lock.releaseWriteLock();
}
}
-
- /**
- * Split the HRegion to create two brand-new ones. This will also close the
- * current HRegion.
- *
- * Returns two brand-new (and open) HRegions
+
+ /*
+ * Split the HRegion to create two brand-new ones. This also closes
+ * current HRegion. Split should be fast since we don't rewrite store files
+ * but instead create new 'reference' store files that read off the top and
+ * bottom ranges of parent store files.
+ * @param midKey Row to split on.
+ * @param listener May be null.
+ * @return two brand-new (and open) HRegions
+ * @throws IOException
*/
- HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
+ HRegion[] closeAndSplit(final Text midKey,
+ final RegionUnavailableListener listener)
throws IOException {
- if(((regionInfo.startKey.getLength() != 0)
- && (regionInfo.startKey.compareTo(midKey) > 0))
- || ((regionInfo.endKey.getLength() != 0)
- && (regionInfo.endKey.compareTo(midKey) < 0))) {
- throw new IOException("Region splitkey must lie within region " +
- "boundaries.");
- }
-
+ checkMidKey(midKey);
long startTime = System.currentTimeMillis();
- Path splits = new Path(regiondir, SPLITDIR);
- if(! fs.exists(splits)) {
- fs.mkdirs(splits);
- }
-
- long regionAId = Math.abs(rand.nextLong());
- HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc,
- regionInfo.startKey, midKey);
- long regionBId = Math.abs(rand.nextLong());
- HRegionInfo regionBInfo =
- new HRegionInfo(regionBId, regionInfo.tableDesc, midKey, null);
- Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName);
- Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName);
- if(fs.exists(dirA) || fs.exists(dirB)) {
- throw new IOException("Cannot split; target file collision at " + dirA +
- " or " + dirB);
+ Path splits = getSplitsDir();
+ HRegionInfo regionAInfo = new HRegionInfo(Math.abs(rand.nextLong()),
+ this.regionInfo.tableDesc, this.regionInfo.startKey, midKey);
+ Path dirA = getSplitRegionDir(splits, regionAInfo.regionName);
+ if(fs.exists(dirA)) {
+ throw new IOException("Cannot split; target file collision at " + dirA);
+ }
+ HRegionInfo regionBInfo = new HRegionInfo(Math.abs(rand.nextLong()),
+ this.regionInfo.tableDesc, midKey, null);
+ Path dirB = getSplitRegionDir(splits, regionBInfo.regionName);
+ if(this.fs.exists(dirB)) {
+ throw new IOException("Cannot split; target file collision at " + dirB);
}
- // We just copied most of the data. Now get whatever updates are up in
- // the memcache (after shutting down new updates).
-
// Notify the caller that we are about to close the region. This moves
- // us ot the 'retiring' queue. Means no more updates coming in -- just
+ // us to the 'retiring' queue. Means no more updates coming in -- just
// whatever is outstanding.
- listener.closing(this.getRegionName());
+ if (listener != null) {
+ listener.closing(getRegionName());
+ }
- // Wait on the last row updates to come in.
- LOG.debug("Starting wait on row locks.");
- waitOnRowLocks();
-
- // Flush this HRegion out to storage, and turn off flushes
- // or compactions until close() is called.
- LOG.debug("Calling flushcache inside closeAndSplit");
- Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
+
+ // Now close the HRegion. Close returns all store files or null if not
+ // supposed to close (? What to do in this case? Implement abort of close?)
+ // Close also does wait on outstanding rows and calls a flush just-in-case.
+ Vector<HStoreFile> hstoreFilesToSplit = close();
if (hstoreFilesToSplit == null) {
- // It should always return a list of hstore files even if memcache is
- // empty. It will return null if concurrent compaction or splits which
- // should not happen.
- throw new NullPointerException("Flushcache did not return any files");
- }
- TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
- for(HStoreFile hsf: hstoreFilesToSplit) {
- alreadySplit.add(splitStoreFile(hsf, splits, regionAInfo,
- regionBInfo, midKey));
+ LOG.warn("Close came back null (Implement abort of close?)");
}
- // Now close the HRegion
- hstoreFilesToSplit = close();
-
// Tell listener that region is now closed and that they can therefore
// clean up any outstanding references.
- listener.closed(this.getRegionName());
-
- // Copy the small remainder
- for(HStoreFile hsf: hstoreFilesToSplit) {
- if(!alreadySplit.contains(hsf)) {
- splitStoreFile(hsf, splits, regionAInfo, regionBInfo, midKey);
- }
+ if (listener != null) {
+ listener.closed(this.getRegionName());
}
-
- // Done
+
+ // Split each store file.
+ for(HStoreFile h: hstoreFilesToSplit) {
+ // A reference to the bottom half of the hsf store file.
+ HStoreFile.Reference aReference = new HStoreFile.Reference(
+ getRegionName(), h.getFileId(), new HStoreKey(midKey),
+ HStoreFile.Range.bottom);
+ HStoreFile a = new HStoreFile(this.conf, splits,
+ regionAInfo.regionName, h.getColFamily(), Math.abs(rand.nextLong()),
+ aReference);
+ HStoreFile.Reference bReference = new HStoreFile.Reference(
+ getRegionName(), h.getFileId(), new HStoreKey(midKey),
+ HStoreFile.Range.top);
+ HStoreFile b = new HStoreFile(this.conf, splits,
+ regionBInfo.regionName, h.getColFamily(), Math.abs(rand.nextLong()),
+ bReference);
+ h.splitStoreFile(a, b, this.fs);
+ }
+
+ // Done!
+ // Opening the region copies the splits files from the splits directory
+ // under each region.
HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
// Cleanup
- fs.delete(splits); // Get rid of splits directory
- fs.delete(regiondir); // and the directory for the old region
- HRegion regions[] = new HRegion[2];
- regions[0] = regionA;
- regions[1] = regionB;
- LOG.info("Region split of " + this.regionInfo.regionName + " complete. " +
- "New regions are: " + regions[0].getRegionName() + ", " +
- regions[1].getRegionName() + ". Took " +
+ boolean deleted = fs.delete(splits); // Get rid of splits directory
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cleaned up " + splits.toString() + " " + deleted);
+ }
+ HRegion regions[] = new HRegion [] {regionA, regionB};
+ LOG.info("Region split of " + this.regionInfo.regionName + " complete; " +
+ "new regions: " + regions[0].getRegionName() + ", " +
+ regions[1].getRegionName() + ". Split took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
return regions;
}
- private HStoreFile splitStoreFile(final HStoreFile hsf, final Path splits,
- final HRegionInfo a, final HRegionInfo b, final Text midKey)
- throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Started splitting HStore " + hsf.getRegionName() + "/" +
- hsf.getColFamily() + "/" + hsf.fileId());
+ private void checkMidKey(final Text midKey) throws IOException {
+ if(((this.regionInfo.startKey.getLength() != 0)
+ && (this.regionInfo.startKey.compareTo(midKey) > 0))
+ || ((this.regionInfo.endKey.getLength() != 0)
+ && (this.regionInfo.endKey.compareTo(midKey) < 0))) {
+ throw new IOException("Region splitkey must lie within region " +
+ "boundaries.");
}
- HStoreFile dstA = new HStoreFile(conf, splits, a.regionName,
- hsf.getColFamily(), Math.abs(rand.nextLong()));
- HStoreFile dstB = new HStoreFile(conf, splits, b.regionName,
- hsf.getColFamily(), Math.abs(rand.nextLong()));
- hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Finished splitting HStore " + hsf.getRegionName() + "/" +
- hsf.getColFamily() + "/" + hsf.fileId());
+ }
+
+ private Path getSplitRegionDir(final Path splits, final Text regionName) {
+ return HRegion.getRegionDir(splits, regionName);
+ }
+
+ private Path getSplitsDir() throws IOException {
+ Path splits = new Path(this.regiondir, SPLITDIR);
+ if(!this.fs.exists(splits)) {
+ this.fs.mkdirs(splits);
}
- return hsf;
+ return splits;
}
//////////////////////////////////////////////////////////////////////////////
@@ -591,22 +569,22 @@
/** @return start key for region */
public Text getStartKey() {
- return regionInfo.startKey;
+ return this.regionInfo.startKey;
}
/** @return end key for region */
public Text getEndKey() {
- return regionInfo.endKey;
+ return this.regionInfo.endKey;
}
/** @return region id */
public long getRegionId() {
- return regionInfo.regionId;
+ return this.regionInfo.regionId;
}
/** @return region name */
public Text getRegionName() {
- return regionInfo.regionName;
+ return this.regionInfo.regionName;
}
/** @return root directory path */
@@ -616,27 +594,27 @@
/** @return HTableDescriptor for this region */
public HTableDescriptor getTableDesc() {
- return regionInfo.tableDesc;
+ return this.regionInfo.tableDesc;
}
/** @return HLog in use for this region */
public HLog getLog() {
- return log;
+ return this.log;
}
/** @return Configuration object */
public Configuration getConf() {
- return conf;
+ return this.conf;
}
/** @return region directory Path */
public Path getRegionDir() {
- return regiondir;
+ return this.regiondir;
}
/** @return FileSystem being used by this region */
public FileSystem getFilesystem() {
- return fs;
+ return this.fs;
}
//////////////////////////////////////////////////////////////////////////////
@@ -646,63 +624,74 @@
// upkeep.
//////////////////////////////////////////////////////////////////////////////
- /**
+ /*
* Iterates through all the HStores and finds the one with the largest
* MapFile size. If the size is greater than the (currently hard-coded)
* threshold, returns true indicating that the region should be split. The
* midKey for the largest MapFile is returned through the midKey parameter.
- *
- * @param midKey - (return value) midKey of the largest MapFile
- * @return - true if the region should be split
+ * It is possible for us to rule the region non-splitable even in excess of
+ * configured size. This happens if region contains a reference file. If
+ * a reference file, the region can not be split.
+ * @param midKey midKey of the largest MapFile
+ * @return true if the region should be split. midKey is set by this method.
+ * Check it for a midKey value on return.
*/
boolean needsSplit(Text midKey) {
lock.obtainReadLock();
try {
- Text key = new Text();
- long maxSize = 0;
- long aggregateSize = 0;
- for(HStore store: stores.values()) {
- long size = store.getLargestFileSize(key);
- aggregateSize += size;
- if(size > maxSize) { // Largest so far
- maxSize = size;
- midKey.set(key);
- }
- }
+ HStore.HStoreSize biggest = largestHStore(midKey);
long triggerSize =
this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
- boolean split = (maxSize >= triggerSize || aggregateSize >= triggerSize);
+ boolean split = (biggest.getAggregate() >= triggerSize);
if (split) {
- LOG.info("Splitting " + getRegionName().toString() +
- " because largest file is " + StringUtils.humanReadableInt(maxSize) +
- ", aggregate size is " +
- StringUtils.humanReadableInt(aggregateSize) +
- " and desired size is " +
- StringUtils.humanReadableInt(this.desiredMaxFileSize));
+ if (!biggest.isSplitable()) {
+ LOG.warn("Region " + getRegionName().toString() +
+ " is NOT splitable though its aggregate size is " +
+ StringUtils.humanReadableInt(biggest.getAggregate()) +
+ " and desired size is " +
+ StringUtils.humanReadableInt(this.desiredMaxFileSize));
+ split = false;
+ } else {
+ LOG.info("Splitting " + getRegionName().toString() +
+ " because largest aggregate size is " +
+ StringUtils.humanReadableInt(biggest.getAggregate()) +
+ " and desired size is " +
+ StringUtils.humanReadableInt(this.desiredMaxFileSize));
+ }
}
return split;
} finally {
lock.releaseReadLock();
}
}
-
+
/**
- * @return - returns the size of the largest HStore
- */
- long largestHStore() {
- long maxsize = 0;
+ * @return returns size of largest HStore. Also returns whether store is
+ * splitable or not (Its not splitable if region has a store that has a
+ * reference store file).
+ */
+ HStore.HStoreSize largestHStore(final Text midkey) {
+ HStore.HStoreSize biggest = null;
+ boolean splitable = true;
lock.obtainReadLock();
try {
- Text key = new Text();
for(HStore h: stores.values()) {
- long size = h.getLargestFileSize(key);
-
- if(size > maxsize) { // Largest so far
- maxsize = size;
+ HStore.HStoreSize size = h.size(midkey);
+ // If we came across a reference down in the store, then propagate
+ // fact that region is not splitable.
+ if (splitable) {
+ splitable = size.splitable;
+ }
+ if (biggest == null) {
+ biggest = size;
+ continue;
+ }
+ if(size.getAggregate() > biggest.getAggregate()) { // Largest so far
+ biggest = size;
}
}
- return maxsize;
-
+ biggest.setSplitable(splitable);
+ return biggest;
} finally {
lock.releaseReadLock();
}
@@ -891,7 +880,6 @@
* not flush, returns list of all store files.
*/
Vector<HStoreFile> internalFlushcache() throws IOException {
-
long startTime = -1;
if(LOG.isDebugEnabled()) {
startTime = System.currentTimeMillis();
@@ -911,12 +899,14 @@
// explicitly cleaned up using a call to deleteSnapshot().
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
if(retval == null || retval.memcacheSnapshot == null) {
+ LOG.debug("Finished memcache flush; empty snapshot");
return getAllStoreFiles();
}
long logCacheFlushId = retval.sequenceId;
if(LOG.isDebugEnabled()) {
LOG.debug("Snapshotted memcache for region " +
- this.regionInfo.regionName + ". Sequence id " + retval.sequenceId);
+ this.regionInfo.regionName + " with sequence id " + retval.sequenceId +
+ " and entries " + retval.memcacheSnapshot.size());
}
// A. Flush memcache to all the HStores.
@@ -1272,7 +1262,7 @@
+ lockid + " unexpected aborted by another thread");
}
- this.targetColumns.remove(lockid);
+ this.targetColumns.remove(Long.valueOf(lockid));
releaseRowLock(row);
}
}
@@ -1387,7 +1377,7 @@
// Pattern is that all access to rowsToLocks and/or to
// locksToRows is via a lock on rowsToLocks.
synchronized(rowsToLocks) {
- return locksToRows.get(lockid);
+ return locksToRows.get(Long.valueOf(lockid));
}
}
@@ -1398,7 +1388,7 @@
void releaseRowLock(Text row) {
synchronized(rowsToLocks) {
long lockid = rowsToLocks.remove(row).longValue();
- locksToRows.remove(lockid);
+ locksToRows.remove(Long.valueOf(lockid));
rowsToLocks.notifyAll();
}
}
@@ -1415,6 +1405,11 @@
}
}
+ @Override
+ public String toString() {
+ return getRegionName().toString();
+ }
+
/**
* HScanner is an iterator through a bunch of rows in an HRegion.
*/
@@ -1686,7 +1681,7 @@
static HRegion createHRegion(final HRegionInfo info,
final Path rootDir, final Configuration conf, final Path initialFiles)
throws IOException {
- Path regionDir = HStoreFile.getHRegionDir(rootDir, info.regionName);
+ Path regionDir = HRegion.getRegionDir(rootDir, info.regionName);
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
return new HRegion(rootDir,
@@ -1721,19 +1716,23 @@
final long startCode)
throws IOException {
HTable t = new HTable(conf, table);
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(bytes);
- region.getRegionInfo().write(out);
- long lockid = t.startUpdate(region.getRegionName());
- t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
- t.put(lockid, COL_SERVER,
- serverAddress.toString().getBytes(UTF8_ENCODING));
- t.put(lockid, COL_STARTCODE,
- String.valueOf(startCode).getBytes(UTF8_ENCODING));
- t.commit(lockid);
- if (LOG.isDebugEnabled()) {
- LOG.info("Added region " + region.getRegionName() + " to table " +
- table);
+ try {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(bytes);
+ region.getRegionInfo().write(out);
+ long lockid = t.startUpdate(region.getRegionName());
+ t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
+ t.put(lockid, COL_SERVER,
+ serverAddress.toString().getBytes(UTF8_ENCODING));
+ t.put(lockid, COL_STARTCODE,
+ String.valueOf(startCode).getBytes(UTF8_ENCODING));
+ t.commit(lockid);
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Added region " + region.getRegionName() + " to table " +
+ table);
+ }
+ } finally {
+ t.close();
}
}
@@ -1748,31 +1747,126 @@
final Text table, final Text regionName)
throws IOException {
HTable t = new HTable(conf, table);
- long lockid = t.startUpdate(regionName);
+ try {
+ removeRegionFromMETA(t, regionName);
+ } finally {
+ t.close();
+ }
+ }
+
+ /**
+ * Delete <code>region</code> from META <code>table</code>.
+ * @param conf Configuration object
+ * @param table META table we are to delete region from.
+ * @param regionName Region to remove.
+ * @throws IOException
+ */
+ static void removeRegionFromMETA(final HTable t, final Text regionName)
+ throws IOException {
+ long lockid = t.startBatchUpdate(regionName);
t.delete(lockid, COL_REGIONINFO);
t.delete(lockid, COL_SERVER);
t.delete(lockid, COL_STARTCODE);
t.commit(lockid);
if (LOG.isDebugEnabled()) {
- LOG.debug("Removed " + regionName + " from table " + table);
+ LOG.debug("Removed " + regionName + " from table " + t.getTableName());
+ }
+ }
+
+ /**
+ * Delete <code>split</code> column from META <code>table</code>.
+ * @param t
+ * @param split
+ * @param regionName Region to remove.
+ * @throws IOException
+ */
+ static void removeSplitFromMETA(final HTable t, final Text regionName,
+ final Text split)
+ throws IOException {
+ long lockid = t.startBatchUpdate(regionName);
+ t.delete(lockid, split);
+ t.commit(lockid);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removed " + split + " from " + regionName +
+ " from table " + t.getTableName());
+ }
+ }
+
+ /**
+ * <code>region</code> has split. Update META <code>table</code>.
+ * @param client Client to use running update.
+ * @param table META table we are to delete region from.
+ * @param regionName Region to remove.
+ * @throws IOException
+ */
+ static void writeSplitToMETA(final Configuration conf,
+ final Text table, final Text regionName, final HRegionInfo splitA,
+ final HRegionInfo splitB)
+ throws IOException {
+ HTable t = new HTable(conf, table);
+ try {
+ HRegionInfo hri = getRegionInfo(t.get(regionName, COL_REGIONINFO));
+ hri.offLine = true;
+ hri.split = true;
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bytes);
+ hri.write(dos);
+ dos.close();
+ long lockid = t.startBatchUpdate(regionName);
+ t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
+ t.put(lockid, COL_SPLITA, Writables.getBytes(splitA));
+ t.put(lockid, COL_SPLITB, Writables.getBytes(splitB));
+ t.commitBatch(lockid);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updated " + regionName + " in table " + table +
+ " on its being split");
+ }
+ } finally {
+ t.close();
}
}
/**
+ * @param whichSplit COL_SPLITA or COL_SPLITB?
* @param data Map of META row labelled column data.
- * @return Server
+ * @return HRegionInfo or null if not found.
+ * @throws IOException
+ */
+ static HRegionInfo getSplit(final TreeMap<Text, byte[]> data,
+ final Text whichSplit)
+ throws IOException {
+ if (!(whichSplit.equals(COL_SPLITA) || whichSplit.equals(COL_SPLITB))) {
+ throw new IOException("Illegal Argument: " + whichSplit);
+ }
+ byte [] bytes = data.get(whichSplit);
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+ return (HRegionInfo)((bytes == null || bytes.length == 0)?
+ null:
+ Writables.getWritable(bytes, new HRegionInfo()));
+ }
+
+ /**
+ * @param data Map of META row labelled column data.
+ * @return An HRegionInfo instance.
+ * @throws IOException
*/
static HRegionInfo getRegionInfo(final TreeMap<Text, byte[]> data)
throws IOException {
- byte[] bytes = data.get(COL_REGIONINFO);
+ return getRegionInfo(data.get(COL_REGIONINFO));
+ }
+
+ /**
+ * @param bytes Bytes of a HRegionInfo.
+ * @return An HRegionInfo instance.
+ * @throws IOException
+ */
+ static HRegionInfo getRegionInfo(final byte[] bytes) throws IOException {
if (bytes == null || bytes.length == 0) {
throw new IOException("no value for " + COL_REGIONINFO);
}
- DataInputBuffer in = new DataInputBuffer();
- in.reset(bytes, bytes.length);
- HRegionInfo info = new HRegionInfo();
- info.readFields(in);
- return info;
+ return (HRegionInfo)Writables.getWritable(bytes, new HRegionInfo());
}
/**
@@ -1810,4 +1904,52 @@
}
return startCode;
}
-}
+
+ public static Path getRegionDir(final Path dir, final Text regionName) {
+ return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName));
+ }
+
+
+ /**
+ * Deletes all the files for a HRegion
+ *
+ * @param fs the file system object
+ * @param baseDirectory base directory for HBase
+ * @param regionName name of the region to delete
+ * @throws IOException
+ * @return True if deleted.
+ */
+ static boolean deleteRegion(FileSystem fs, Path baseDirectory,
+ Text regionName) throws IOException {
+ Path p = HRegion.getRegionDir(fs.makeQualified(baseDirectory), regionName);
+ return fs.delete(p);
+ }
+
+ /**
+ * Look for HStoreFile references in passed region.
+ * @param fs
+ * @param baseDirectory
+ * @param hri
+ * @return True if we found references.
+ * @throws IOException
+ */
+ static boolean hasReferences(final FileSystem fs, final Path baseDirectory,
+ final HRegionInfo hri)
+ throws IOException {
+ boolean result = false;
+ for (Text family: hri.getTableDesc().families().keySet()) {
+ Path p = HStoreFile.getMapDir(baseDirectory, hri.getRegionName(),
+ HStoreKey.extractFamily(family));
+ // Look for reference files.
+ Path [] ps = fs.listPaths(p, new PathFilter () {
+ public boolean accept(Path path) {
+ return HStoreFile.isReference(path);
+ }});
+ if (ps != null && ps.length > 0) {
+ result = true;
+ break;
+ }
+ }
+ return result;
+ }
+}
\ No newline at end of file
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java?view=diff&rev=564012&r1=564011&r2=564012
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java Wed Aug 8 13:30:13 2007
@@ -66,6 +66,7 @@
Text startKey;
Text endKey;
boolean offLine;
+ boolean split;
HTableDescriptor tableDesc;
/** Default constructor - creates empty object */
@@ -76,6 +77,7 @@
this.endKey = new Text();
this.regionName = new Text();
this.offLine = false;
+ this.split = false;
}
/**
@@ -88,6 +90,21 @@
this();
readFields(new DataInputStream(new ByteArrayInputStream(serializedBytes)));
}
+
+ /**
+ * Construct HRegionInfo with explicit parameters
+ *
+ * @param regionId the region id
+ * @param tableDesc the table descriptor
+ * @param startKey first key in region
+ * @param endKey end of key range
+ * @throws IllegalArgumentException
+ */
+ public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey,
+ Text endKey)
+ throws IllegalArgumentException {
+ this(regionId, tableDesc, startKey, endKey, false);
+ }
/**
* Construct HRegionInfo with explicit parameters
@@ -96,10 +113,13 @@
* @param tableDesc the table descriptor
* @param startKey first key in region
* @param endKey end of key range
+ * @param split true if this region has split and we have daughter regions
+ * regions that may or may not hold references to this region.
* @throws IllegalArgumentException
*/
public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey,
- Text endKey) throws IllegalArgumentException {
+ Text endKey, final boolean split)
+ throws IllegalArgumentException {
this.regionId = regionId;
@@ -124,6 +144,7 @@
regionId);
this.offLine = false;
+ this.split = split;
}
/** @return the endKey */
@@ -150,6 +171,20 @@
public HTableDescriptor getTableDesc(){
return tableDesc;
}
+
+ /**
+ * @return True if has been split and has daughters.
+ */
+ public boolean isSplit() {
+ return this.split;
+ }
+
+ /**
+ * @return True if this region is offline.
+ */
+ public boolean isOffline() {
+ return this.offLine;
+ }
/**
* {@inheritDoc}
@@ -157,8 +192,10 @@
@Override
public String toString() {
return "regionname: " + this.regionName.toString() + ", startKey: <" +
- this.startKey.toString() + ">, tableDesc: {" +
- this.tableDesc.toString() + "}";
+ this.startKey.toString() + ">," +
+ (isOffline()? " offline: true,": "") +
+ (isSplit()? " split: true,": "") +
+ " tableDesc: {" + this.tableDesc.toString() + "}";
}
/**
@@ -197,6 +234,7 @@
endKey.write(out);
regionName.write(out);
out.writeBoolean(offLine);
+ out.writeBoolean(split);
}
/**
@@ -209,6 +247,7 @@
this.endKey.readFields(in);
this.regionName.readFields(in);
this.offLine = in.readBoolean();
+ this.split = in.readBoolean();
}
//
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java?view=diff&rev=564012&r1=564011&r2=564012
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java Wed Aug 8 13:30:13 2007
@@ -91,4 +91,4 @@
}
return result;
}
-}
+}
\ No newline at end of file
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=564012&r1=564011&r2=564012
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Wed Aug 8 13:30:13 2007
@@ -113,7 +113,7 @@
// regions.
retiringRegions.put(regionName, onlineRegions.remove(regionName));
if (LOG.isDebugEnabled()) {
- LOG.debug(regionName.toString() + "closing (" +
+ LOG.debug(regionName.toString() + " closing (" +
"Adding to retiringRegions)");
}
} finally {
@@ -199,15 +199,16 @@
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
final Text tableToUpdate =
- region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME) ?
- ROOT_TABLE_NAME : META_TABLE_NAME;
+ region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME)?
+ ROOT_TABLE_NAME : META_TABLE_NAME;
LOG.info("Updating " + tableToUpdate + " with region split info");
// Remove old region from META
for (int tries = 0; tries < numRetries; tries++) {
try {
- HRegion.removeRegionFromMETA(conf, tableToUpdate,
- region.getRegionName());
+ HRegion.writeSplitToMETA(conf, tableToUpdate,
+ region.getRegionName(), newRegions[0].getRegionInfo(),
+ newRegions[1].getRegionInfo());
break;
} catch (IOException e) {
if(tries == numRetries - 1) {
@@ -242,18 +243,17 @@
LOG.debug("Reporting region split to master");
}
reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
- newRegions[1].getRegionInfo());
+ newRegions[1].getRegionInfo());
LOG.info("region split, META update, and report to master all" +
- " successful. Old region=" + oldRegionInfo.getRegionName() +
- ", new regions: " + newRegions[0].getRegionName() + ", " +
- newRegions[1].getRegionName());
+ " successful. Old region=" + oldRegionInfo.getRegionName() +
+ ", new regions: " + newRegions[0].getRegionName() + ", " +
+ newRegions[1].getRegionName());
// Finally, start serving the new regions
lock.writeLock().lock();
try {
onlineRegions.put(newRegions[0].getRegionName(), newRegions[0]);
onlineRegions.put(newRegions[1].getRegionName(), newRegions[1]);
-
} finally {
lock.writeLock().unlock();
}
@@ -307,7 +307,7 @@
iex = x;
}
}
- LOG.error(iex);
+ LOG.error("", iex);
}
}
}
@@ -347,10 +347,10 @@
* {@inheritDoc}
*/
public void run() {
- while(! stopRequested) {
+ while(!stopRequested) {
synchronized(logRollerLock) {
- // If the number of log entries is high enough, roll the log. This is a
- // very fast operation, but should not be done too frequently.
+ // If the number of log entries is high enough, roll the log. This
+ // is a very fast operation, but should not be done too frequently.
int nEntries = log.getNumEntries();
if(nEntries > this.maxLogEntries) {
try {
@@ -359,17 +359,16 @@
} catch (IOException iex) {
if (iex instanceof RemoteException) {
try {
- iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex);
-
+ iex = RemoteExceptionHandler.
+ decodeRemoteException((RemoteException) iex);
} catch (IOException x) {
iex = x;
}
}
- LOG.warn(iex);
+ LOG.warn("", iex);
}
}
}
-
if(!stopRequested) {
try {
Thread.sleep(threadWakeFrequency);
@@ -586,7 +585,7 @@
e = ex;
}
}
- LOG.error(e);
+ LOG.error("", e);
}
while(! stopRequested) {
@@ -653,9 +652,6 @@
break;
default:
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got default message");
- }
try {
toDo.put(new ToDoEntry(msgs[i]));
} catch (InterruptedException e) {
@@ -678,7 +674,7 @@
e = ex;
}
}
- LOG.error(e);
+ LOG.error("", e);
}
}
@@ -716,16 +712,16 @@
if (abortRequested) {
try {
log.close();
+ LOG.info("On abort, closed hlog");
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
} catch (IOException ex) {
e = ex;
}
}
- LOG.warn(e);
+ LOG.warn("Abort close of log", e);
}
closeAllRegions(); // Don't leave any open file handles
LOG.info("aborting server at: " +
@@ -743,7 +739,7 @@
e = ex;
}
}
- LOG.error(e);
+ LOG.error("", e);
}
try {
HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
@@ -767,7 +763,7 @@
e = ex;
}
}
- LOG.warn(e);
+ LOG.warn("", e);
}
LOG.info("stopping server at: " +
serverInfo.getServerAddress().toString());
@@ -1113,7 +1109,7 @@
iex = x;
}
}
- LOG.error(iex);
+ LOG.error("", iex);
}
}
}
@@ -1267,12 +1263,11 @@
if (e instanceof RemoteException) {
try {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
} catch (IOException x) {
e = x;
}
}
- LOG.error(e);
+ LOG.error("", e);
throw e;
}
return scannerId;
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=564012&r1=564011&r2=564012
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Wed Aug 8 13:30:13 2007
@@ -33,21 +33,21 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.StringUtils;
-
-import org.onelab.filter.*;
+import org.onelab.filter.BloomFilter;
+import org.onelab.filter.CountingBloomFilter;
+import org.onelab.filter.Filter;
+import org.onelab.filter.RetouchedBloomFilter;
/**
* HStore maintains a bunch of data files. It is responsible for maintaining
@@ -86,8 +86,8 @@
final HLocking lock = new HLocking();
- TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
- TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
+ TreeMap<Long, HStoreFile> storefiles = new TreeMap<Long, HStoreFile>();
+ TreeMap<Long, MapFile.Reader> readers = new TreeMap<Long, MapFile.Reader>();
Random rand = new Random();
@@ -137,10 +137,9 @@
if(family.getCompression() != HColumnDescriptor.CompressionType.NONE) {
if(family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
this.compression = SequenceFile.CompressionType.BLOCK;
-
- } else if(family.getCompression() == HColumnDescriptor.CompressionType.RECORD) {
+ } else if(family.getCompression() ==
+ HColumnDescriptor.CompressionType.RECORD) {
this.compression = SequenceFile.CompressionType.RECORD;
-
} else {
assert(false);
}
@@ -148,16 +147,13 @@
this.fs = fs;
this.conf = conf;
-
this.mapdir = HStoreFile.getMapDir(dir, regionName, familyName);
fs.mkdirs(mapdir);
this.loginfodir = HStoreFile.getInfoDir(dir, regionName, familyName);
fs.mkdirs(loginfodir);
-
if(family.bloomFilter == null) {
this.filterDir = null;
this.bloomFilter = null;
-
} else {
this.filterDir = HStoreFile.getFilterDir(dir, regionName, familyName);
fs.mkdirs(filterDir);
@@ -165,13 +161,15 @@
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Starting HStore for " + this.storeName);
+ LOG.debug("starting " + this.storeName +
+ ((reconstructionLog == null)?
+ " (no reconstruction log)": " with reconstruction log: " +
+ reconstructionLog.toString()));
}
// Either restart or get rid of any leftover compaction work. Either way,
// by the time processReadyCompaction() returns, we can get rid of the
// existing compaction-dir.
-
this.compactdir = new Path(dir, COMPACTION_DIR);
Path curCompactStore =
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
@@ -187,7 +185,7 @@
Vector<HStoreFile> hstoreFiles
= HStoreFile.loadHStoreFiles(conf, dir, regionName, familyName, fs);
for(HStoreFile hsf: hstoreFiles) {
- mapFiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
+ this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
}
// Now go through all the HSTORE_LOGINFOFILEs and figure out the
@@ -209,21 +207,24 @@
}
doReconstructionLog(reconstructionLog, maxSeqID);
-
+
// Compact all the MapFiles into a single file. The resulting MapFile
// should be "timeless"; that is, it should not have an associated seq-ID,
// because all log messages have been reflected in the TreeMaps at this
- // point.
- if(mapFiles.size() >= 1) {
+ // point.
+ //
+ // TODO: Only do the compaction if we are over a threshold, not
+ // every time. Not necessary if only two or three store files. Fix after
+ // revamp of compaction.
+ if(storefiles.size() > 1) {
compactHelper(true);
}
-
+
// Finally, start up all the map readers! (There should be just one at this
// point, as we've compacted them all.)
- for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
- // TODO - is this really necessary? Don't I do this inside compact()?
- maps.put(e.getKey(),
- getMapFileReader(e.getValue().getMapFilePath().toString()));
+ for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
+ this.readers.put(e.getKey(),
+ e.getValue().getReader(this.fs, this.bloomFilter));
}
}
@@ -239,6 +240,11 @@
final long maxSeqID)
throws UnsupportedEncodingException, IOException {
if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
+ if (reconstructionLog != null && !fs.exists(reconstructionLog)) {
+ LOG.warn("Passed reconstruction log " + reconstructionLog +
+ " does not exist");
+ }
+ // Nothing to do.
return;
}
long maxSeqIdInLog = -1;
@@ -271,8 +277,7 @@
}
HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
if (LOG.isDebugEnabled()) {
- LOG.debug("Applying edit " + k.toString() + "=" +
- new String(val.getVal(), UTF8_ENCODING));
+ LOG.debug("Applying edit " + k.toString());
}
reconstructedCache.put(k, val.getVal());
}
@@ -364,144 +369,24 @@
LOG.debug("flushed bloom filter for " + this.storeName);
}
}
-
- /** Generates a bloom filter key from the row and column keys */
- Key getBloomFilterKey(HStoreKey k) {
- StringBuilder s = new StringBuilder(k.getRow().toString());
- s.append(k.getColumn().toString());
-
- byte[] bytes = null;
- try {
- bytes = s.toString().getBytes(HConstants.UTF8_ENCODING);
-
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- assert(false);
- }
- return new Key(bytes);
- }
-
- /**
- * Extends MapFile.Reader and overrides get and getClosest to consult the
- * bloom filter before attempting to read from disk.
- */
- private class BloomFilterReader extends MapFile.Reader {
-
- BloomFilterReader(FileSystem fs, String dirName, Configuration conf)
- throws IOException {
- super(fs, dirName, conf);
- }
-
- /** {@inheritDoc} */
- @Override
- public Writable get(WritableComparable key, Writable val) throws IOException {
- // Note - the key being passed to us is always a HStoreKey
-
- if(bloomFilter.membershipTest(getBloomFilterKey((HStoreKey)key))) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("bloom filter reported that key exists");
- }
- return super.get(key, val);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("bloom filter reported that key does not exist");
- }
- return null;
- }
-
- /** {@inheritDoc} */
- @Override
- public WritableComparable getClosest(WritableComparable key, Writable val)
- throws IOException {
- // Note - the key being passed to us is always a HStoreKey
-
- if(bloomFilter.membershipTest(getBloomFilterKey((HStoreKey)key))) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("bloom filter reported that key exists");
- }
- return super.getClosest(key, val);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("bloom filter reported that key does not exist");
- }
- return null;
- }
- }
-
- /**
- * Extends MapFile.Writer and overrides append, so that whenever a MapFile
- * is written to, the key is added to the bloom filter.
- */
- private class BloomFilterWriter extends MapFile.Writer {
-
- @SuppressWarnings("unchecked")
- BloomFilterWriter(Configuration conf, FileSystem fs, String dirName,
- Class keyClass, Class valClass, SequenceFile.CompressionType compression)
- throws IOException {
- super(conf, fs, dirName, keyClass, valClass, compression);
- }
-
- /** {@inheritDoc} */
- @Override
- public void append(WritableComparable key, Writable val) throws IOException {
- // Note - the key being passed to us is always a HStoreKey
-
- bloomFilter.add(getBloomFilterKey((HStoreKey)key));
- super.append(key, val);
- }
- }
-
- /**
- * Get a MapFile reader
- * This allows us to substitute a BloomFilterReader if a bloom filter is enabled
- */
- MapFile.Reader getMapFileReader(String dirName) throws IOException {
- if(bloomFilter != null) {
- return new BloomFilterReader(fs, dirName, conf);
- }
- return new MapFile.Reader(fs, dirName, conf);
- }
-
- /**
- * Get a MapFile writer
- * This allows us to substitute a BloomFilterWriter if a bloom filter is
- * enabled
- *
- * @param dirName Directory with store files.
- * @return Map file.
- * @throws IOException
- */
- MapFile.Writer getMapFileWriter(String dirName) throws IOException {
- if (bloomFilter != null) {
- return new BloomFilterWriter(conf, fs, dirName, HStoreKey.class,
- ImmutableBytesWritable.class, compression);
- }
- return new MapFile.Writer(conf, fs, dirName, HStoreKey.class,
- ImmutableBytesWritable.class, compression);
- }
//////////////////////////////////////////////////////////////////////////////
// End bloom filters
//////////////////////////////////////////////////////////////////////////////
/**
- * Turn off all the MapFile readers
- *
+ * Close all the MapFile readers
* @throws IOException
*/
void close() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.info("closing HStore for " + this.storeName);
- }
this.lock.obtainWriteLock();
try {
- for (MapFile.Reader map: maps.values()) {
- map.close();
+ for (MapFile.Reader reader: this.readers.values()) {
+ reader.close();
}
- maps.clear();
- mapFiles.clear();
-
- LOG.info("HStore closed for " + this.storeName);
+ this.readers.clear();
+ this.storefiles.clear();
+ LOG.info("closed " + this.storeName);
} finally {
this.lock.releaseWriteLock();
}
@@ -540,17 +425,19 @@
// A. Write the TreeMap out to the disk
HStoreFile flushedFile = HStoreFile.obtainNewHStoreFile(conf, dir,
regionName, familyName, fs);
- Path mapfile = flushedFile.getMapFilePath();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Flushing to " + mapfile.toString());
- }
- MapFile.Writer out = getMapFileWriter(mapfile.toString());
+ String name = flushedFile.toString();
+ MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
+ this.bloomFilter);
+ int count = 0;
+ int total = 0;
try {
for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
HStoreKey curkey = es.getKey();
+ total++;
if (this.familyName.
equals(HStoreKey.extractFamily(curkey.getColumn()))) {
out.append(curkey, new ImmutableBytesWritable(es.getValue()));
+ count++;
}
}
} finally {
@@ -571,13 +458,14 @@
this.lock.obtainWriteLock();
try {
Long flushid = Long.valueOf(logCacheFlushId);
- maps.put(flushid, getMapFileReader(mapfile.toString()));
- mapFiles.put(flushid, flushedFile);
+ // Open the map file reader.
+ this.readers.put(flushid,
+ flushedFile.getReader(this.fs, this.bloomFilter));
+ this.storefiles.put(flushid, flushedFile);
if(LOG.isDebugEnabled()) {
- LOG.debug("Added " + mapfile.toString() +
- " with flush id " + logCacheFlushId + " and size " +
- StringUtils.humanReadableInt(mapfile.getFileSystem(this.conf).
- getContentLength(mapfile)));
+ LOG.debug("Added " + name +
+ " with sequence id " + logCacheFlushId + " and size " +
+ StringUtils.humanReadableInt(flushedFile.length()));
}
} finally {
this.lock.releaseWriteLock();
@@ -593,8 +481,7 @@
Vector<HStoreFile> getAllMapFiles() {
this.lock.obtainReadLock();
try {
- return new Vector<HStoreFile>(mapFiles.values());
-
+ return new Vector<HStoreFile>(storefiles.values());
} finally {
this.lock.releaseReadLock();
}
@@ -632,7 +519,7 @@
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
fs.mkdirs(curCompactStore);
if(LOG.isDebugEnabled()) {
- LOG.debug("started compaction of " + mapFiles.size() + " files in " +
+ LOG.debug("started compaction of " + storefiles.size() + " files in " +
curCompactStore.toString());
}
try {
@@ -640,7 +527,7 @@
Vector<HStoreFile> toCompactFiles = null;
this.lock.obtainWriteLock();
try {
- toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
+ toCompactFiles = new Vector<HStoreFile>(storefiles.values());
} finally {
this.lock.releaseWriteLock();
}
@@ -660,6 +547,7 @@
HStoreFile compactedOutputFile
= new HStoreFile(conf, compactdir, regionName, familyName, -1);
if(toCompactFiles.size() == 1) {
+ // TODO: Only rewrite if NOT a HSF reference file.
if(LOG.isDebugEnabled()) {
LOG.debug("nothing to compact for " + this.storeName);
}
@@ -671,7 +559,8 @@
// Step through them, writing to the brand-new TreeMap
MapFile.Writer compactedOut =
- getMapFileWriter(compactedOutputFile.getMapFilePath().toString());
+ compactedOutputFile.getWriter(this.fs, this.compression,
+ this.bloomFilter);
try {
// We create a new set of MapFile.Reader objects so we don't screw up
// the caching associated with the currently-loaded ones.
@@ -684,14 +573,14 @@
// lowest-ranked one. Updates to a single row/column will appear
// ranked by timestamp. This allows us to throw out deleted values or
// obsolete versions.
- MapFile.Reader[] readers = new MapFile.Reader[toCompactFiles.size()];
+ MapFile.Reader[] rdrs = new MapFile.Reader[toCompactFiles.size()];
HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
ImmutableBytesWritable[] vals =
new ImmutableBytesWritable[toCompactFiles.size()];
boolean[] done = new boolean[toCompactFiles.size()];
int pos = 0;
for(HStoreFile hsf: toCompactFiles) {
- readers[pos] = getMapFileReader(hsf.getMapFilePath().toString());
+ rdrs[pos] = hsf.getReader(this.fs, this.bloomFilter);
keys[pos] = new HStoreKey();
vals[pos] = new ImmutableBytesWritable();
done[pos] = false;
@@ -701,9 +590,9 @@
// Now, advance through the readers in order. This will have the
// effect of a run-time sort of the entire dataset.
int numDone = 0;
- for(int i = 0; i < readers.length; i++) {
- readers[i].reset();
- done[i] = ! readers[i].next(keys[i], vals[i]);
+ for(int i = 0; i < rdrs.length; i++) {
+ rdrs[i].reset();
+ done[i] = ! rdrs[i].next(keys[i], vals[i]);
if(done[i]) {
numDone++;
}
@@ -715,7 +604,7 @@
while(numDone < done.length) {
// Find the reader with the smallest key
int smallestKey = -1;
- for(int i = 0; i < readers.length; i++) {
+ for(int i = 0; i < rdrs.length; i++) {
if(done[i]) {
continue;
}
@@ -760,10 +649,10 @@
// Advance the smallest key. If that reader's all finished, then
// mark it as done.
- if(! readers[smallestKey].next(keys[smallestKey],
+ if(! rdrs[smallestKey].next(keys[smallestKey],
vals[smallestKey])) {
done[smallestKey] = true;
- readers[smallestKey].close();
+ rdrs[smallestKey].close();
numDone++;
}
}
@@ -772,8 +661,7 @@
}
if(LOG.isDebugEnabled()) {
- LOG.debug("writing new compacted HStore to " +
- compactedOutputFile.getMapFilePath().toString());
+ LOG.debug("writing new compacted HStore " + compactedOutputFile);
}
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
@@ -818,7 +706,6 @@
* invoked at HStore startup, if the prior execution died midway through.
*/
void processReadyCompaction() throws IOException {
-
// Move the compacted TreeMap into place.
// That means:
// 1) Acquiring the write-lock
@@ -830,29 +717,20 @@
// 7) Releasing the write-lock
// 1. Acquiring the write-lock
-
-
Path curCompactStore =
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
this.lock.obtainWriteLock();
try {
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
if(!fs.exists(doneFile)) {
-
// The last execution didn't finish the compaction, so there's nothing
// we can do. We'll just have to redo it. Abandon it and return.
LOG.warn("Redoing a failed compaction");
return;
}
- // OK, there's actually compaction work that needs to be put into place.
- if(LOG.isDebugEnabled()) {
- LOG.debug("Process ready compaction starting");
- }
-
// 2. Load in the files to be deleted.
// (Figuring out what MapFiles are going to be replaced)
-
Vector<HStoreFile> toCompactFiles = new Vector<HStoreFile>();
Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
DataInputStream in = new DataInputStream(fs.open(filesToReplace));
@@ -868,41 +746,29 @@
in.close();
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("loaded " + toCompactFiles.size() +
- " file(s) to be deleted");
- }
-
- // 3. Unload all the replaced MapFiles.
- Iterator<HStoreFile> it2 = mapFiles.values().iterator();
- for(Iterator<MapFile.Reader> it = maps.values().iterator();
- it.hasNext(); ) {
- MapFile.Reader curReader = it.next();
- HStoreFile curMapFile = it2.next();
- if(toCompactFiles.contains(curMapFile)) {
- curReader.close();
- it.remove();
- }
- }
-
- for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
- HStoreFile curMapFile = it.next();
- if(toCompactFiles.contains(curMapFile)) {
- it.remove();
- }
- }
+ // 3. Unload all the replaced MapFiles. Do it by getting keys of all
+ // to remove. Then cycling on keys, removing, closing and deleting.
// What if we crash at this point? No big deal; we will restart
// processReadyCompaction(), and nothing has been lost.
-
- // 4. Delete all the old files, no longer needed
- for(HStoreFile hsf: toCompactFiles) {
- fs.delete(hsf.getMapFilePath());
- fs.delete(hsf.getInfoFilePath());
+ Vector<Long> keys = new Vector<Long>(toCompactFiles.size());
+ for(Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
+ if(toCompactFiles.contains(e.getValue())) {
+ keys.add(e.getKey());
+ }
}
+ for (Long key: keys) {
+ MapFile.Reader reader = this.readers.remove(key);
+ if (reader != null) {
+ reader.close();
+ }
+ HStoreFile hsf = this.storefiles.remove(key);
+ // 4. Delete all old files, no longer needed
+ hsf.delete();
+ }
if(LOG.isDebugEnabled()) {
- LOG.debug("old file(s) deleted");
+ LOG.debug("deleted " + toCompactFiles.size() + " old file(s)");
}
// What if we fail now? The above deletes will fail silently. We'd better
@@ -915,24 +781,19 @@
HStoreFile finalCompactedFile
= HStoreFile.obtainNewHStoreFile(conf, dir, regionName, familyName, fs);
if(LOG.isDebugEnabled()) {
- LOG.debug("moving " + compactedFile.getMapFilePath().toString() +
- " to " + finalCompactedFile.getMapFilePath().toString());
+ LOG.debug("moving " + compactedFile.toString() + " in " +
+ compactdir.toString() +
+ " to " + finalCompactedFile.toString() + " in " + dir.toString());
}
-
- fs.rename(compactedFile.getMapFilePath(),
- finalCompactedFile.getMapFilePath());
-
- // Fail here? No problem.
- fs.rename(compactedFile.getInfoFilePath(),
- finalCompactedFile.getInfoFilePath());
+ compactedFile.rename(this.fs, finalCompactedFile);
// Fail here? No worries.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
// 6. Loading the new TreeMap.
- mapFiles.put(orderVal, finalCompactedFile);
- maps.put(orderVal, getMapFileReader(
- finalCompactedFile.getMapFilePath().toString()));
+ this.readers.put(orderVal,
+ finalCompactedFile.getReader(this.fs, this.bloomFilter));
+ this.storefiles.put(orderVal, finalCompactedFile);
} finally {
// 7. Releasing the write-lock
@@ -955,8 +816,7 @@
throws IOException {
this.lock.obtainReadLock();
try {
- MapFile.Reader[] maparray
- = maps.values().toArray(new MapFile.Reader[maps.size()]);
+ MapFile.Reader[] maparray = getReaders();
for (int i = maparray.length - 1; i >= 0; i--) {
MapFile.Reader map = maparray[i];
synchronized(map) {
@@ -984,6 +844,11 @@
this.lock.releaseReadLock();
}
}
+
+ private MapFile.Reader [] getReaders() {
+ return this.readers.values().
+ toArray(new MapFile.Reader[this.readers.size()]);
+ }
/**
* Get the value for the indicated HStoreKey. Grab the target value and the
@@ -999,10 +864,8 @@
List<byte []> results = new ArrayList<byte []>();
this.lock.obtainReadLock();
try {
- MapFile.Reader[] maparray
- = maps.values().toArray(new MapFile.Reader[maps.size()]);
-
- for(int i = maparray.length-1; i >= 0; i--) {
+ MapFile.Reader[] maparray = getReaders();
+ for(int i = maparray.length - 1; i >= 0; i--) {
MapFile.Reader map = maparray[i];
synchronized(map) {
@@ -1044,40 +907,83 @@
}
}
+ /*
+ * Data structure to hold result of a look at store file sizes.
+ */
+ class HStoreSize {
+ final long aggregate;
+ final long largest;
+ boolean splitable;
+
+ HStoreSize(final long a, final long l, final boolean s) {
+ this.aggregate = a;
+ this.largest = l;
+ this.splitable = s;
+ }
+
+ long getAggregate() {
+ return this.aggregate;
+ }
+
+ long getLargest() {
+ return this.largest;
+ }
+
+ boolean isSplitable() {
+ return this.splitable;
+ }
+
+ void setSplitable(final boolean s) {
+ this.splitable = s;
+ }
+ }
+
/**
- * Gets the size of the largest MapFile and its mid key.
+ * Gets size for the store.
*
- * @param midKey - the middle key for the largest MapFile
- * @return - size of the largest MapFile
+ * @param midKey Gets set to the middle key of the largest splitable store
+ * file or its set to empty if largest is not splitable.
+ * @return Sizes for the store and the passed <code>midKey</code> is
+ * set to midKey of largest splitable. Otherwise, its set to empty
+ * to indicate we couldn't find a midkey to split on
*/
- long getLargestFileSize(Text midKey) {
+ HStoreSize size(Text midKey) {
long maxSize = 0L;
- if (this.mapFiles.size() <= 0) {
- return maxSize;
+ long aggregateSize = 0L;
+ // Not splitable if we find a reference store file present in the store.
+ boolean splitable = true;
+ if (this.storefiles.size() <= 0) {
+ return new HStoreSize(0, 0, splitable);
}
this.lock.obtainReadLock();
try {
Long mapIndex = Long.valueOf(0L);
// Iterate through all the MapFiles
- for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
+ for(Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
HStoreFile curHSF = e.getValue();
- long size = fs.getFileStatus(
- new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME)).getLen();
- if(size > maxSize) { // This is the largest one so far
+ long size = curHSF.length();
+ aggregateSize += size;
+ if (maxSize == 0L || size > maxSize) {
+ // This is the largest one so far
maxSize = size;
mapIndex = e.getKey();
}
+ if (splitable) {
+ splitable = !curHSF.isReference();
+ }
+ }
+ MapFile.Reader r = this.readers.get(mapIndex);
+ WritableComparable midkey = r.midKey();
+ if (midkey != null) {
+ midKey.set(((HStoreKey)midkey).getRow());
}
-
- MapFile.Reader r = maps.get(mapIndex);
- midKey.set(((HStoreKey)r.midKey()).getRow());
} catch(IOException e) {
- LOG.warn(e);
+ LOG.warn("", e);
} finally {
this.lock.releaseReadLock();
}
- return maxSize;
+ return new HStoreSize(aggregateSize, maxSize, splitable);
}
/**
@@ -1086,7 +992,7 @@
int getNMaps() {
this.lock.obtainReadLock();
try {
- return maps.size();
+ return storefiles.size();
} finally {
this.lock.releaseReadLock();
@@ -1127,20 +1033,17 @@
private MapFile.Reader[] readers;
HStoreScanner(long timestamp, Text[] targetCols, Text firstRow)
- throws IOException {
-
+ throws IOException {
super(timestamp, targetCols);
-
lock.obtainReadLock();
try {
- this.readers = new MapFile.Reader[mapFiles.size()];
+ this.readers = new MapFile.Reader[storefiles.size()];
// Most recent map file should be first
int i = readers.length - 1;
- for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
- HStoreFile curHSF = it.next();
- readers[i--] = getMapFileReader(curHSF.getMapFilePath().toString());
+ for(HStoreFile curHSF: storefiles.values()) {
+ readers[i--] = curHSF.getReader(fs, bloomFilter);
}
this.keys = new HStoreKey[readers.length];
@@ -1164,7 +1067,7 @@
}
} catch (Exception ex) {
- LOG.error(ex);
+ LOG.error("Failed construction", ex);
close();
}
}
@@ -1218,9 +1121,8 @@
if(readers[i] != null) {
try {
readers[i].close();
-
} catch(IOException e) {
- LOG.error(e);
+ LOG.error("Sub-scanner close", e);
}
}
@@ -1240,9 +1142,8 @@
if(readers[i] != null) {
try {
readers[i].close();
-
} catch(IOException e) {
- LOG.error(e);
+ LOG.error("Scanner close", e);
}
}
}