You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2013/03/05 19:25:45 UTC

svn commit: r1452936 [1/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/backup/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/master/balancer/ main/java/org/ap...

Author: mbertozzi
Date: Tue Mar  5 18:25:44 2013
New Revision: 1452936

URL: http://svn.apache.org/r1452936
Log:
HBASE-7808 Refactor Store to use HRegionFileSystem

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileInfo.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TakeSnapshotUtils.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileArchiveTestingUtil.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHFileArchiveUtil.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java Tue Mar  5 18:25:44 2013
@@ -186,19 +186,19 @@ public class HFileArchiver {
    * Remove the store files, either by archiving them or outright deletion
    * @param conf {@link Configuration} to examine to determine the archive directory
    * @param fs the filesystem where the store files live
-   * @param parent Parent region hosting the store files
+   * @param regionInfo {@link HRegionInfo} of the region hosting the store files
    * @param family the family hosting the store files
    * @param compactedFiles files to be disposed of. No further reading of these files should be
    *          attempted; otherwise likely to cause an {@link IOException}
    * @throws IOException if the files could not be correctly disposed.
    */
-  public static void archiveStoreFiles(Configuration conf, FileSystem fs, HRegion parent,
-      byte[] family, Collection<StoreFile> compactedFiles) throws IOException {
+  public static void archiveStoreFiles(Configuration conf, FileSystem fs, HRegionInfo regionInfo,
+      Path tableDir, byte[] family, Collection<StoreFile> compactedFiles) throws IOException {
 
     // sometimes in testing, we don't have rss, so we need to check for that
     if (fs == null) {
       LOG.warn("Passed filesystem is null, so just deleting the files without archiving for region:"
-          + Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family));
+          + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family));
       deleteStoreFilesWithoutArchiving(compactedFiles);
       return;
     }
@@ -210,10 +210,10 @@ public class HFileArchiver {
     }
 
     // build the archive path
-    if (parent == null || family == null) throw new IOException(
-        "Need to have a parent region and a family to archive from.");
+    if (regionInfo == null || family == null) throw new IOException(
+        "Need to have a region and a family to archive from.");
 
-    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family);
+    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
 
     // make sure we don't archive if we can't and that the archive dir exists
     if (!fs.mkdirs(storeArchiveDir)) {
@@ -231,7 +231,36 @@ public class HFileArchiver {
     // do the actual archive
     if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) {
       throw new IOException("Failed to archive/delete all the files for region:"
-          + Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family)
+          + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family)
+          + " into " + storeArchiveDir + ". Something is probably awry on the filesystem.");
+    }
+  }
+
+  /**
+   * Archive the store file
+   * @param fs the filesystem where the store files live
+   * @param regionInfo region hosting the store files
+   * @param conf {@link Configuration} to examine to determine the archive directory
+   * @param tableDir {@link Path} to where the table is being stored (for building the archive path)
+   * @param family the family hosting the store files
+   * @param storeFile file to be archived
+   * @throws IOException if the files could not be correctly disposed.
+   */
+  public static void archiveStoreFile(Configuration conf, FileSystem fs, HRegionInfo regionInfo,
+      Path tableDir, byte[] family, Path storeFile) throws IOException {
+    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
+    // make sure we don't archive if we can't and that the archive dir exists
+    if (!fs.mkdirs(storeArchiveDir)) {
+      throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
+          + Bytes.toString(family) + ", deleting compacted files instead.");
+    }
+
+    // do the actual archive
+    long start = EnvironmentEdgeManager.currentTimeMillis();
+    File file = new FileablePath(fs, storeFile);
+    if (!resolveAndArchiveFile(storeArchiveDir, file, Long.toString(start))) {
+      throw new IOException("Failed to archive/delete the file for region:"
+          + regionInfo.getRegionNameAsString() + ", family:" + Bytes.toString(family)
           + " into " + storeArchiveDir + ". Something is probably awry on the filesystem.");
     }
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java Tue Mar  5 18:25:44 2013
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 
@@ -67,12 +67,12 @@ public class HFileLink extends FileLink 
    */
   public static final String LINK_NAME_REGEX =
     String.format("%s=%s-%s", HTableDescriptor.VALID_USER_TABLE_REGEX,
-      HRegionInfo.ENCODED_REGION_NAME_REGEX, StoreFile.HFILE_NAME_REGEX);
+      HRegionInfo.ENCODED_REGION_NAME_REGEX, StoreFileInfo.HFILE_NAME_REGEX);
 
   /** Define the HFile Link name parser in the form of: table=region-hfile */
   private static final Pattern LINK_NAME_PATTERN =
     Pattern.compile(String.format("^(%s)=(%s)-(%s)$", HTableDescriptor.VALID_USER_TABLE_REGEX,
-      HRegionInfo.ENCODED_REGION_NAME_REGEX, StoreFile.HFILE_NAME_REGEX));
+      HRegionInfo.ENCODED_REGION_NAME_REGEX, StoreFileInfo.HFILE_NAME_REGEX));
 
   /**
    * The pattern should be used for hfile and reference links

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java Tue Mar  5 18:25:44 2013
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
@@ -85,16 +86,18 @@ public class HalfStoreFileReader extends
    * Creates a half file reader for a hfile referred to by an hfilelink.
    * @param fs fileystem to read from
    * @param p path to hfile
-   * @param link
+   * @param in {@link FSDataInputStream}
+   * @param inNoChecksum {@link FSDataInputStream} opened on a filesystem without checksum
+   * @param size Full size of the hfile file
    * @param cacheConf
    * @param r original reference file (contains top or bottom)
    * @param preferredEncodingInCache
    * @throws IOException
    */
-  public HalfStoreFileReader(final FileSystem fs, final Path p, final HFileLink link,
-      final CacheConfig cacheConf, final Reference r,
-      DataBlockEncoding preferredEncodingInCache) throws IOException {
-    super(fs, p, link, link.getFileStatus(fs).getLen(), cacheConf, preferredEncodingInCache, true);
+  public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStream in,
+      final FSDataInputStream inNoChecksum, long size, final CacheConfig cacheConf,
+      final Reference r, final DataBlockEncoding preferredEncodingInCache) throws IOException {
+    super(fs, p, in, inNoChecksum, size, cacheConf, preferredEncodingInCache, true);
     // This is not actual midkey for this half-file; its just border
     // around which we split top and bottom.  Have to look in files to find
     // actual last and first keys for bottom and top halves.  Half-files don't

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java Tue Mar  5 18:25:44 2013
@@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.catalog.M
 import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
@@ -301,14 +301,13 @@ class CatalogJanitor extends Chore {
     HTableDescriptor parentDescriptor = getTableDescriptor(parent.getTableName());
 
     for (HColumnDescriptor family: parentDescriptor.getFamilies()) {
-      Path p = HStore.getStoreHomedir(tabledir, daughter.getEncodedName(),
-        family.getName());
+      Path p = HStore.getStoreHomedir(tabledir, daughter, family.getName());
       if (!fs.exists(p)) continue;
       // Look for reference files.  Call listStatus with anonymous instance of PathFilter.
       FileStatus [] ps = FSUtils.listStatus(fs, p,
           new PathFilter () {
             public boolean accept(Path path) {
-              return StoreFile.isReference(path);
+              return StoreFileInfo.isReference(path);
             }
           }
       );

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java Tue Mar  5 18:25:44 2013
@@ -125,8 +125,7 @@ class RegionLocationFinder {
       HTableDescriptor tableDescriptor = getTableDescriptor(region.getTableName());
       if (tableDescriptor != null) {
         HDFSBlocksDistribution blocksDistribution =
-            HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor,
-              region.getEncodedName());
+            HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
         List<String> topHosts = blocksDistribution.getTopHosts();
         topServerNames = mapHostNameToServerName(topHosts);
       }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java Tue Mar  5 18:25:44 2013
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.io.HFileLink;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 /**
  * This Chore, every time it runs, will clear the HFiles in the hfile archive
  * folder that are deletable for each HFile cleaner in the chain.
@@ -52,7 +52,7 @@ public class HFileCleaner extends Cleane
     if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
       return true;
     }
-    return StoreFile.validateStoreFileName(file.getName());
+    return StoreFileInfo.validateStoreFileName(file.getName());
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java Tue Mar  5 18:25:44 2013
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.master.Ma
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.exceptions.CorruptedSnapshotException;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.TakeSnapshotUtils;
@@ -207,7 +207,7 @@ public final class MasterSnapshotVerifie
       Path archivedCfDir = new Path(archivedRegion, cf.getPath().getName());
       for (FileStatus hfile : hfiles) {
         // make sure the name is correct
-        if (!StoreFile.validateStoreFileName(hfile.getPath().getName())) {
+        if (!StoreFileInfo.validateStoreFileName(hfile.getPath().getName())) {
           throw new CorruptedSnapshotException("HFile: " + hfile.getPath()
               + " is not a valid hfile name.", snapshot);
         }
@@ -224,4 +224,4 @@ public final class MasterSnapshotVerifie
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java Tue Mar  5 18:25:44 2013
@@ -112,12 +112,12 @@ public class CompactionTool extends Conf
         Path regionDir = path.getParent();
         Path tableDir = regionDir.getParent();
         HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
-        HRegion region = loadRegion(fs, conf, htd, regionDir);
-        compactStoreFiles(region, path, compactOnce);
+        HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+        compactStoreFiles(tableDir, htd, hri, path.getName(), compactOnce);
       } else if (isRegionDir(fs, path)) {
         Path tableDir = path.getParent();
         HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
-        compactRegion(htd, path, compactOnce);
+        compactRegion(tableDir, htd, path, compactOnce);
       } else if (isTableDir(fs, path)) {
         compactTable(path, compactOnce);
       } else {
@@ -129,19 +129,16 @@ public class CompactionTool extends Conf
     private void compactTable(final Path tableDir, final boolean compactOnce)
         throws IOException {
       HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
-      LOG.info("Compact table=" + htd.getNameAsString());
       for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
-        compactRegion(htd, regionDir, compactOnce);
+        compactRegion(tableDir, htd, regionDir, compactOnce);
       }
     }
 
-    private void compactRegion(final HTableDescriptor htd, final Path regionDir,
-        final boolean compactOnce) throws IOException {
-      HRegion region = loadRegion(fs, conf, htd, regionDir);
-      LOG.info("Compact table=" + htd.getNameAsString() +
-        " region=" + region.getRegionNameAsString());
+    private void compactRegion(final Path tableDir, final HTableDescriptor htd,
+        final Path regionDir, final boolean compactOnce) throws IOException {
+      HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
       for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
-        compactStoreFiles(region, familyDir, compactOnce);
+        compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce);
       }
     }
 
@@ -150,12 +147,13 @@ public class CompactionTool extends Conf
      * If the compact once flag is not specified, execute the compaction until
      * no more compactions are needed. Uses the Configuration settings provided.
      */
-    private void compactStoreFiles(final HRegion region, final Path familyDir,
-        final boolean compactOnce) throws IOException {
-      LOG.info("Compact table=" + region.getTableDesc().getNameAsString() +
-        " region=" + region.getRegionNameAsString() +
-        " family=" + familyDir.getName());
-      HStore store = getStore(region, familyDir);
+    private void compactStoreFiles(final Path tableDir, final HTableDescriptor htd,
+        final HRegionInfo hri, final String familyName, final boolean compactOnce)
+        throws IOException {
+      HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
+      LOG.info("Compact table=" + htd.getNameAsString() +
+        " region=" + hri.getRegionNameAsString() +
+        " family=" + familyName);
       do {
         CompactionContext compaction = store.requestCompaction();
         if (compaction == null) break;
@@ -174,29 +172,17 @@ public class CompactionTool extends Conf
      * Create a "mock" HStore that uses the tmpDir specified by the user and
      * the store dir to compact as source.
      */
-    private HStore getStore(final HRegion region, final Path storeDir) throws IOException {
-      byte[] familyName = Bytes.toBytes(storeDir.getName());
-      HColumnDescriptor hcd = region.getTableDesc().getFamily(familyName);
-      // Create a Store w/ check of hbase.rootdir blanked out and return our
-      // list of files instead of have Store search its home dir.
-      return new HStore(tmpDir, region, hcd, fs, conf) {
-        @Override
-        public FileStatus[] getStoreFiles() throws IOException {
-          return this.fs.listStatus(getHomedir());
-        }
-
+    private static HStore getStore(final Configuration conf, final FileSystem fs,
+        final Path tableDir, final HTableDescriptor htd, final HRegionInfo hri,
+        final String familyName, final Path tempDir) throws IOException {
+      HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
         @Override
-        Path createStoreHomeDir(FileSystem fs, Path homedir) throws IOException {
-          return storeDir;
+        public Path getTempDir() {
+          return tempDir;
         }
       };
-    }
-
-    private static HRegion loadRegion(final FileSystem fs, final Configuration conf,
-        final HTableDescriptor htd, final Path regionDir) throws IOException {
-      Path rootDir = regionDir.getParent().getParent();
-      HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
-      return HRegion.createHRegion(hri, rootDir, conf, htd, null, false, true);
+      HRegion region = new HRegion(regionFs, null, conf, htd, null);
+      return new HStore(region, htd.getFamily(Bytes.toBytes(familyName)), conf);
     }
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Mar  5 18:25:44 2013
@@ -633,7 +633,7 @@ public class HRegion implements HeapSize
         status.setStatus("Instantiating store for column family " + family);
         completionService.submit(new Callable<HStore>() {
           public HStore call() throws IOException {
-            return instantiateHStore(getTableDir(), family);
+            return instantiateHStore(family);
           }
         });
       }
@@ -706,32 +706,23 @@ public class HRegion implements HeapSize
    * This is a helper function to compute HDFS block distribution on demand
    * @param conf configuration
    * @param tableDescriptor HTableDescriptor of the table
-   * @param regionEncodedName encoded name of the region
+   * @param regionInfo encoded name of the region
    * @return The HDFS blocks distribution for the given region.
- * @throws IOException
+   * @throws IOException
    */
-  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
-    Configuration conf, HTableDescriptor tableDescriptor,
-    String regionEncodedName) throws IOException {
-    HDFSBlocksDistribution hdfsBlocksDistribution =
-      new HDFSBlocksDistribution();
-    Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf),
-      tableDescriptor.getName());
+  public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
+      final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
+    HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
+    Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf), tableDescriptor.getName());
     FileSystem fs = tablePath.getFileSystem(conf);
 
+    HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
-      Path storeHomeDir = HStore.getStoreHomedir(tablePath, regionEncodedName,
-      family.getName());
-      if (!fs.exists(storeHomeDir))continue;
-
-      FileStatus[] hfilesStatus = null;
-      hfilesStatus = fs.listStatus(storeHomeDir);
-
-      for (FileStatus hfileStatus : hfilesStatus) {
-        HDFSBlocksDistribution storeFileBlocksDistribution =
-          FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0,
-          hfileStatus.getLen());
-        hdfsBlocksDistribution.add(storeFileBlocksDistribution);
+      Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
+      if (storeFiles == null) continue;
+
+      for (StoreFileInfo storeFileInfo : storeFiles) {
+        hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
       }
     }
     return hdfsBlocksDistribution;
@@ -1121,14 +1112,6 @@ public class HRegion implements HeapSize
   void doRegionCompactionPrep() throws IOException {
   }
 
-  /**
-   * Get the temporary directory for this region. This directory
-   * will have its contents removed when the region is reopened.
-   */
-  Path getTmpDir() {
-    return fs.getTempDir();
-  }
-
   void triggerMajorCompaction() {
     for (Store h : stores.values()) {
       h.triggerMajorCompaction();
@@ -2377,8 +2360,7 @@ public class HRegion implements HeapSize
     // files/batch, far more than the number of store files under a single column family.
     for (Store store : stores.values()) {
       // 2.1. build the snapshot reference directory for the store
-      Path dstStoreDir = TakeSnapshotUtils.getStoreSnapshotDirectory(
-        snapshotRegionFs.getRegionDir(), Bytes.toString(store.getFamily().getName()));
+      Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
       List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
       if (LOG.isDebugEnabled()) {
         LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
@@ -2965,9 +2947,8 @@ public class HRegion implements HeapSize
     return true;
   }
 
-  protected HStore instantiateHStore(Path tableDir, HColumnDescriptor c)
-      throws IOException {
-    return new HStore(tableDir, this, c, this.getFilesystem(), this.conf);
+  protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
+    return new HStore(this, family, this.conf);
   }
 
   /**
@@ -4171,13 +4152,13 @@ public class HRegion implements HeapSize
    * @param colFamily the column family
    * @throws IOException
    */
-  public static void makeColumnFamilyDirs(FileSystem fs, Path tabledir,
-    final HRegionInfo hri, byte [] colFamily)
-  throws IOException {
-    Path dir = HStore.getStoreHomedir(tabledir, hri.getEncodedName(), colFamily);
+  private static Path makeColumnFamilyDirs(FileSystem fs, Path tabledir,
+    final HRegionInfo hri, byte [] colFamily) throws IOException {
+    Path dir = HStore.getStoreHomedir(tabledir, hri, colFamily);
     if (!fs.mkdirs(dir)) {
       LOG.warn("Failed to create " + dir);
     }
+    return dir;
   }
 
   /**
@@ -4299,14 +4280,12 @@ public class HRegion implements HeapSize
     byFamily = filesByFamily(byFamily, b.close());
     for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
       byte [] colFamily = es.getKey();
-      makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
+      Path storeDir = makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
       // Because we compacted the source regions we should have no more than two
       // HStoreFiles per family and there will be no reference store
       List<StoreFile> srcFiles = es.getValue();
       for (StoreFile hsf: srcFiles) {
-        StoreFile.rename(fs, hsf.getPath(),
-          StoreFile.getUniqueFile(fs, HStore.getStoreHomedir(tableDir,
-            newRegionInfo.getEncodedName(), colFamily)));
+        StoreFile.rename(fs, hsf.getPath(), StoreFile.getUniqueFile(fs, storeDir));
       }
     }
     if (LOG.isDebugEnabled()) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java Tue Mar  5 18:25:44 2013
@@ -22,6 +22,9 @@ package org.apache.hadoop.hbase.regionse
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +41,6 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -118,6 +120,214 @@ public class HRegionFileSystem {
   }
 
   // ===========================================================================
+  //  Store/StoreFile Helpers
+  // ===========================================================================
+  /**
+   * Returns the directory path of the specified family
+   * @param familyName Column Family Name
+   * @return {@link Path} to the directory of the specified family
+   */
+  Path getStoreDir(final String familyName) {
+    return new Path(this.getRegionDir(), familyName);
+  }
+
+  /**
+   * Create the store directory for the specified family name
+   * @param familyName Column Family Name
+   * @return {@link Path} to the directory of the specified family
+   * @throws IOException if the directory creation fails.
+   */
+  public Path createStoreDir(final String familyName) throws IOException {
+    Path storeDir = getStoreDir(familyName);
+    if (!fs.exists(storeDir) && !fs.mkdirs(storeDir)) {
+      throw new IOException("Failed create of: " + storeDir);
+    }
+    return storeDir;
+  }
+
+  /**
+   * Returns the store files available for the family.
+   * This methods performs the filtering based on the valid store files.
+   * @param familyName Column Family Name
+   * @return a set of {@link StoreFileInfo} for the specified family.
+   */
+  public Collection<StoreFileInfo> getStoreFiles(final byte[] familyName) throws IOException {
+    return getStoreFiles(Bytes.toString(familyName));
+  }
+
+  /**
+   * Returns the store files available for the family.
+   * This methods performs the filtering based on the valid store files.
+   * @param familyName Column Family Name
+   * @return a set of {@link StoreFileInfo} for the specified family.
+   */
+  public Collection<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
+    Path familyDir = getStoreDir(familyName);
+    FileStatus[] files = FSUtils.listStatus(this.fs, familyDir);
+    if (files == null) return null;
+
+    ArrayList<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(files.length);
+    for (FileStatus status: files) {
+      if (!StoreFileInfo.isValid(status)) continue;
+
+      storeFiles.add(new StoreFileInfo(this.conf, this.fs, status));
+    }
+    return storeFiles;
+  }
+
+  /**
+   * @return the set of families present on disk
+   */
+  public Collection<String> getFamilies() throws IOException {
+    FileStatus[] fds = FSUtils.listStatus(fs, getRegionDir(), new FSUtils.FamilyDirFilter(fs));
+    if (fds == null) return null;
+
+    ArrayList<String> families = new ArrayList<String>(fds.length);
+    for (FileStatus status: fds) {
+      families.add(status.getPath().getName());
+    }
+
+    return families;
+  }
+
+  /**
+   * Generate a unique file name, used by createTempName() and commitStoreFile()
+   * @param suffix extra information to append to the generated name
+   * @return Unique file name
+   */
+  private static String generateUniqueName(final String suffix) {
+    String name = UUID.randomUUID().toString().replaceAll("-", "");
+    if (suffix != null) name += suffix;
+    return name;
+  }
+
+  /**
+   * Generate a unique temporary Path. Used in conjuction with commitStoreFile()
+   * to get a safer file creation.
+   * <code>
+   * Path file = fs.createTempName();
+   * ...StoreFile.Writer(file)...
+   * fs.commitStoreFile("family", file);
+   * </code>
+   *
+   * @return Unique {@link Path} of the temporary file
+   */
+  public Path createTempName() {
+    return createTempName(null);
+  }
+
+  /**
+   * Generate a unique temporary Path. Used in conjuction with commitStoreFile()
+   * to get a safer file creation.
+   * <code>
+   * Path file = fs.createTempName();
+   * ...StoreFile.Writer(file)...
+   * fs.commitStoreFile("family", file);
+   * </code>
+   *
+   * @param suffix extra information to append to the generated name
+   * @return Unique {@link Path} of the temporary file
+   */
+  public Path createTempName(final String suffix) {
+    return new Path(getTempDir(), generateUniqueName(suffix));
+  }
+
+  /**
+   * Move the file from a build/temp location to the main family store directory.
+   * @param familyName Family that will gain the file
+   * @param buildPath {@link Path} to the file to commit.
+   * @return The new {@link Path} of the committed file
+   * @throws IOException
+   */
+  public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException {
+    return commitStoreFile(familyName, buildPath, -1, false);
+  }
+
+  /**
+   * Move the file from a build/temp location to the main family store directory.
+   * @param familyName Family that will gain the file
+   * @param buildPath {@link Path} to the file to commit.
+   * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)
+   * @param generateNewName False if you want to keep the buildPath name
+   * @return The new {@link Path} of the committed file
+   * @throws IOException
+   */
+  public Path commitStoreFile(final String familyName, final Path buildPath,
+      final long seqNum, final boolean generateNewName) throws IOException {
+    Path storeDir = getStoreDir(familyName);
+    fs.mkdirs(storeDir);
+    String name = buildPath.getName();
+    if (generateNewName) {
+      name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
+    }
+    Path dstPath = new Path(storeDir, name);
+    if (!fs.exists(buildPath)) {
+      throw new FileNotFoundException(buildPath.toString());
+    }
+    LOG.debug("Committing store file " + buildPath + " as " + dstPath);
+    if (!fs.rename(buildPath, dstPath)) {
+      throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
+    }
+    return dstPath;
+  }
+
+  /**
+   * Archives the specified store file from the specified family.
+   * @param familyName Family that contains the store files
+   * @param filePath {@link Path} to the store file to remove
+   * @throws IOException if the archiving fails
+   */
+  public void removeStoreFile(final String familyName, final Path filePath)
+      throws IOException {
+    HFileArchiver.archiveStoreFile(this.conf, this.fs, this.regionInfo,
+        this.tableDir, Bytes.toBytes(familyName), filePath);
+  }
+
+  /**
+   * Closes and archives the specified store files from the specified family.
+   * @param familyName Family that contains the store files
+   * @param storeFiles set of store files to remove
+   * @throws IOException if the archiving fails
+   */
+  public void removeStoreFiles(final String familyName, final Collection<StoreFile> storeFiles)
+      throws IOException {
+    HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfo,
+        this.tableDir, Bytes.toBytes(familyName), storeFiles);
+  }
+
+  /**
+   * Bulk load: Add a specified store file to the specified family.
+   * If the source file is on the same different file-system is moved from the
+   * source location to the destination location, otherwise is copied over.
+   *
+   * @param familyName Family that will gain the file
+   * @param srcPath {@link Path} to the file to import
+   * @param seqNum Bulk Load sequence number
+   * @return The destination {@link Path} of the bulk loaded file
+   * @throws IOException
+   */
+  public Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
+      throws IOException {
+    // Copy the file if it's on another filesystem
+    FileSystem srcFs = srcPath.getFileSystem(conf);
+    FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs;
+
+    // We can't compare FileSystem instances as equals() includes UGI instance
+    // as part of the comparison and won't work when doing SecureBulkLoad
+    // TODO deal with viewFS
+    if (!srcFs.getUri().equals(desFs.getUri())) {
+      LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
+          "the destination store. Copying file over to destination filesystem.");
+      Path tmpPath = createTempName();
+      FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
+      LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
+      srcPath = tmpPath;
+    }
+
+    return commitStoreFile(familyName, srcPath, seqNum, true);
+  }
+
+  // ===========================================================================
   //  Splits Helpers
   // ===========================================================================
   /** @return {@link Path} to the temp directory used during split operations */
@@ -359,4 +569,4 @@ public class HRegionFileSystem {
       LOG.warn("Failed delete of " + regionDir);
     }
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Mar  5 18:25:44 2013
@@ -41,9 +41,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CompoundConfiguration;
@@ -52,11 +50,9 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.exceptions.WrongRegionException;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -77,7 +73,6 @@ import org.apache.hadoop.hbase.util.Chec
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
@@ -116,15 +111,12 @@ public class HStore implements Store {
 
   protected final MemStore memstore;
   // This stores directory in the filesystem.
-  private final Path homedir;
   private final HRegion region;
   private final HColumnDescriptor family;
-  CompactionPolicy compactionPolicy;
-  final FileSystem fs;
-  final Configuration conf;
-  final CacheConfig cacheConf;
-  // ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo.
-  private long ttl;
+  final CompactionPolicy compactionPolicy;
+  private final HRegionFileSystem fs;
+  private final Configuration conf;
+  private final CacheConfig cacheConf;
   private long lastCompactSize = 0;
   volatile boolean forceMajor = false;
   /* how many bytes to write between status checks */
@@ -137,7 +129,7 @@ public class HStore implements Store {
 
   private ScanInfo scanInfo;
 
-  private StoreFileManager storeFileManager;
+  private final StoreFileManager storeFileManager;
   final List<StoreFile> filesCompacting = Lists.newArrayList();
 
   // All access must be synchronized.
@@ -164,25 +156,20 @@ public class HStore implements Store {
 
   /**
    * Constructor
-   * @param basedir qualified path under which the region directory lives;
-   * generally the table subdirectory
    * @param region
    * @param family HColumnDescriptor for this column
-   * @param fs file system object
    * @param confParam configuration object
    * failed.  Can be null.
    * @throws IOException
    */
-  protected HStore(Path basedir, HRegion region, HColumnDescriptor family,
-      FileSystem fs, Configuration confParam)
-  throws IOException {
+  protected HStore(final HRegion region, final HColumnDescriptor family,
+      final Configuration confParam) throws IOException {
 
     HRegionInfo info = region.getRegionInfo();
-    this.fs = fs;
-    // Assemble the store's home directory.
-    Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
-    // Ensure it exists.
-    this.homedir = createStoreHomeDir(this.fs, p);
+    this.fs = region.getRegionFileSystem();
+
+    // Assemble the store's home directory and Ensure it exists.
+    fs.createStoreDir(family.getNameAsString());
     this.region = region;
     this.family = family;
     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
@@ -200,13 +187,13 @@ public class HStore implements Store {
             family.getDataBlockEncoding());
 
     this.comparator = info.getComparator();
-    // Get TTL
-    this.ttl = determineTTLFromFamily(family);
     // used by ScanQueryMatcher
     long timeToPurgeDeletes =
         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
     LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
         "ms in store " + this);
+    // Get TTL
+    long ttl = determineTTLFromFamily(family);
     // Why not just pass a HColumnDescriptor in here altogether?  Even if have
     // to clone it?
     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
@@ -250,7 +237,7 @@ public class HStore implements Store {
 
   /**
    * @param family
-   * @return
+   * @return TTL in seconds of the specified family
    */
   private static long determineTTLFromFamily(final HColumnDescriptor family) {
     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
@@ -273,27 +260,15 @@ public class HStore implements Store {
 
   @Override
   public String getTableName() {
-    return this.region.getTableDesc().getNameAsString();
-  }
-
-  /**
-   * Create this store's homedir
-   * @param fs
-   * @param homedir
-   * @return Return <code>homedir</code>
-   * @throws IOException
-   */
-  Path createStoreHomeDir(final FileSystem fs,
-      final Path homedir) throws IOException {
-    if (!fs.exists(homedir)) {
-      if (!fs.mkdirs(homedir))
-        throw new IOException("Failed create of: " + homedir.toString());
-    }
-    return homedir;
+    return this.getRegionInfo().getTableNameAsString();
   }
 
   @Override
   public FileSystem getFileSystem() {
+    return this.fs.getFileSystem();
+  }
+
+  public HRegionFileSystem getRegionFileSystem() {
     return this.fs;
   }
 
@@ -301,7 +276,7 @@ public class HStore implements Store {
   @Override
   public long getStoreFileTtl() {
     // TTL only applies if there's no MIN_VERSIONs setting on the column.
-    return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
+    return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
   }
 
   @Override
@@ -358,42 +333,27 @@ public class HStore implements Store {
   }
 
   /**
-   * @param tabledir
-   * @param encodedName Encoded region name.
-   * @param family
+   * @param tabledir {@link Path} to where the table is being stored
+   * @param hri {@link HRegionInfo} for the region.
+   * @param family {@link HColumnDescriptor} describing the column family
    * @return Path to family/Store home directory.
    */
+  @Deprecated
   public static Path getStoreHomedir(final Path tabledir,
-      final String encodedName, final byte [] family) {
-    return getStoreHomedir(tabledir, encodedName, Bytes.toString(family));
+      final HRegionInfo hri, final byte[] family) {
+    return getStoreHomedir(tabledir, hri.getEncodedName(), family);
   }
 
   /**
-   * @param tabledir
+   * @param tabledir {@link Path} to where the table is being stored
    * @param encodedName Encoded region name.
-   * @param family
+   * @param family {@link HColumnDescriptor} describing the column family
    * @return Path to family/Store home directory.
    */
+  @Deprecated
   public static Path getStoreHomedir(final Path tabledir,
-      final String encodedName, final String family) {
-    return new Path(tabledir, new Path(encodedName, new Path(family)));
-  }
-
-  /**
-   * @param parentRegionDirectory directory for the parent region
-   * @param family family name of this store
-   * @return Path to the family/Store home directory
-   */
-  public static Path getStoreHomedir(final Path parentRegionDirectory,
-      final byte[] family) {
-    return new Path(parentRegionDirectory, new Path(Bytes.toString(family)));
-  }
-  /**
-   * Return the directory in which this store stores its
-   * StoreFiles
-   */
-  Path getHomedir() {
-    return homedir;
+      final String encodedName, final byte[] family) {
+    return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
   }
 
   @Override
@@ -409,48 +369,31 @@ public class HStore implements Store {
     this.dataBlockEncoder = blockEncoder;
   }
 
-  FileStatus[] getStoreFiles() throws IOException {
-    return FSUtils.listStatus(this.fs, this.homedir, null);
-  }
-
   /**
    * Creates an unsorted list of StoreFile loaded in parallel
    * from the given directory.
    * @throws IOException
    */
   private List<StoreFile> loadStoreFiles() throws IOException {
-    ArrayList<StoreFile> results = new ArrayList<StoreFile>();
-    FileStatus files[] = getStoreFiles();
-
-    if (files == null || files.length == 0) {
-      return results;
+    Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
+    if (files == null || files.size() == 0) {
+      return new ArrayList<StoreFile>();
     }
+
     // initialize the thread pool for opening store files in parallel..
     ThreadPoolExecutor storeFileOpenerThreadPool =
       this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
-          this.family.getNameAsString());
+          this.getColumnFamilyName());
     CompletionService<StoreFile> completionService =
       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
 
     int totalValidStoreFile = 0;
-    for (int i = 0; i < files.length; i++) {
-      // Skip directories.
-      if (files[i].isDir()) {
-        continue;
-      }
-      final Path p = files[i].getPath();
-      // Check for empty hfile. Should never be the case but can happen
-      // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
-      // NOTE: that the HFileLink is just a name, so it's an empty file.
-      if (!HFileLink.isHFileLink(p) && this.fs.getFileStatus(p).getLen() <= 0) {
-        LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
-        continue;
-      }
-
+    final FileSystem fs = this.getFileSystem();
+    for (final StoreFileInfo storeFileInfo: files) {
       // open each store file in parallel
       completionService.submit(new Callable<StoreFile>() {
         public StoreFile call() throws IOException {
-          StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
+          StoreFile storeFile = new StoreFile(fs, storeFileInfo.getPath(), conf, cacheConf,
               family.getBloomFilterType(), dataBlockEncoder);
           storeFile.createReader();
           return storeFile;
@@ -459,6 +402,7 @@ public class HStore implements Store {
       totalValidStoreFile++;
     }
 
+    ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
     IOException ioe = null;
     try {
       for (int i = 0; i < totalValidStoreFile; i++) {
@@ -477,8 +421,8 @@ public class HStore implements Store {
           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
         } catch (ExecutionException e) {
           if (ioe == null) ioe = new IOException(e.getCause());
-        } 
-      } 
+        }
+      }
     } finally {
       storeFileOpenerThreadPool.shutdownNow();
     }
@@ -543,7 +487,7 @@ public class HStore implements Store {
     HFile.Reader reader  = null;
     try {
       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
-          + "store " + this + " region " + this.region);
+          + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
       reader = HFile.createReader(srcPath.getFileSystem(conf),
           srcPath, cacheConf);
       reader.loadFileInfo();
@@ -557,14 +501,13 @@ public class HStore implements Store {
       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
           " last=" + Bytes.toStringBinary(lastKey));
       LOG.debug("Region bounds: first=" +
-          Bytes.toStringBinary(region.getStartKey()) +
-          " last=" + Bytes.toStringBinary(region.getEndKey()));
+          Bytes.toStringBinary(getRegionInfo().getStartKey()) +
+          " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
 
-      HRegionInfo hri = region.getRegionInfo();
-      if (!hri.containsRange(firstKey, lastKey)) {
+      if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
         throw new WrongRegionException(
             "Bulk load file " + srcPath.toString() + " does not fit inside region "
-            + this.region);
+            + this.getRegionInfo().getRegionNameAsString());
       }
 
       if (verifyBulkLoads) {
@@ -602,38 +545,17 @@ public class HStore implements Store {
   @Override
   public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
     Path srcPath = new Path(srcPathStr);
+    Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
 
-    // Copy the file if it's on another filesystem
-    FileSystem srcFs = srcPath.getFileSystem(conf);
-    FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs;
-    //We can't compare FileSystem instances as
-    //equals() includes UGI instance as part of the comparison
-    //and won't work when doing SecureBulkLoad
-    //TODO deal with viewFS
-    if (!srcFs.getUri().equals(desFs.getUri())) {
-      LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
-          "the destination store. Copying file over to destination filesystem.");
-      Path tmpPath = getTmpPath();
-      FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
-      LOG.info("Copied " + srcPath
-          + " to temporary path on destination filesystem: " + tmpPath);
-      srcPath = tmpPath;
-    }
-
-    Path dstPath = StoreFile.getRandomFilename(fs, homedir,
-        (seqNum == -1) ? null : "_SeqId_" + seqNum + "_");
-    LOG.debug("Renaming bulk load file " + srcPath + " to " + dstPath);
-    StoreFile.rename(fs, srcPath, dstPath);
-
-    StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf,
+    StoreFile sf = new StoreFile(this.getFileSystem(), dstPath, this.conf, this.cacheConf,
         this.family.getBloomFilterType(), this.dataBlockEncoder);
 
     StoreFile.Reader r = sf.createReader();
     this.storeSize += r.length();
     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
 
-    LOG.info("Moved HFile " + srcPath + " into store directory " +
-        homedir + " - updating store file list.");
+    LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() +
+        "' as " + dstPath + " - updating store file list.");
 
     // Append the new storefile into the list
     this.lock.writeLock().lock();
@@ -652,16 +574,6 @@ public class HStore implements Store {
         + " into store " + this + " (new location: " + dstPath + ")");
   }
 
-  /**
-   * Get a temporary path in this region. These temporary files
-   * will get cleaned up when the region is re-opened if they are
-   * still around.
-   */
-  private Path getTmpPath() throws IOException {
-    return StoreFile.getRandomFilename(
-        fs, region.getTmpDir());
-  }
-
   @Override
   public ImmutableCollection<StoreFile> close() throws IOException {
     this.lock.writeLock().lock();
@@ -673,7 +585,7 @@ public class HStore implements Store {
         // initialize the thread pool for closing store files in parallel.
         ThreadPoolExecutor storeFileCloserThreadPool = this.region
             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
-                + this.family.getNameAsString());
+                + this.getColumnFamilyName());
 
         // close each store file in parallel
         CompletionService<Void> completionService =
@@ -809,20 +721,19 @@ public class HStore implements Store {
     // treat this as a minor compaction.
     InternalScanner scanner = null;
     KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
-    if (this.region.getCoprocessorHost() != null) {
-      scanner = this.region.getCoprocessorHost()
-          .preFlushScannerOpen(this, memstoreScanner);
+    if (this.getCoprocessorHost() != null) {
+      scanner = this.getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
     }
     if (scanner == null) {
       Scan scan = new Scan();
       scan.setMaxVersions(scanInfo.getMaxVersions());
       scanner = new StoreScanner(this, scanInfo, scan,
           Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
-          this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
+          smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
     }
-    if (this.region.getCoprocessorHost() != null) {
+    if (this.getCoprocessorHost() != null) {
       InternalScanner cpScanner =
-        this.region.getCoprocessorHost().preFlush(this, scanner);
+        this.getCoprocessorHost().preFlush(this, scanner);
       // NULL scanner returned from coprocessor hooks means skip normal processing
       if (cpScanner == null) {
         return null;
@@ -898,17 +809,10 @@ public class HStore implements Store {
       MonitoredTask status)
       throws IOException {
     // Write-out finished successfully, move into the right spot
-    String fileName = path.getName();
-    Path dstPath = new Path(homedir, fileName);
-    String msg = "Renaming flushed file at " + path + " to " + dstPath;
-    LOG.debug(msg);
-    status.setStatus("Flushing " + this + ": " + msg);
-    if (!fs.rename(path, dstPath)) {
-      LOG.warn("Unable to rename " + path + " to " + dstPath);
-    }
+    Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
 
     status.setStatus("Flushing " + this + ": reopening flushed file");
-    StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
+    StoreFile sf = new StoreFile(this.getFileSystem(), dstPath, this.conf, this.cacheConf,
         this.family.getBloomFilterType(), this.dataBlockEncoder);
 
     StoreFile.Reader r = sf.createReader();
@@ -927,7 +831,7 @@ public class HStore implements Store {
    * @param maxKeyCount
    * @return Writer for a new StoreFile in the tmp dir.
    */
-  private StoreFile.Writer createWriterInTmp(int maxKeyCount)
+  private StoreFile.Writer createWriterInTmp(long maxKeyCount)
   throws IOException {
     return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
   }
@@ -938,7 +842,7 @@ public class HStore implements Store {
    * @param isCompaction whether we are creating a new file in a compaction
    * @return Writer for a new StoreFile in the tmp dir.
    */
-  public StoreFile.Writer createWriterInTmp(int maxKeyCount,
+  public StoreFile.Writer createWriterInTmp(long maxKeyCount,
     Compression.Algorithm compression, boolean isCompaction)
   throws IOException {
     final CacheConfig writerCacheConf;
@@ -950,8 +854,8 @@ public class HStore implements Store {
       writerCacheConf = cacheConf;
     }
     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
-        fs, blocksize)
-            .withOutputDir(region.getTmpDir())
+        this.getFileSystem(), blocksize)
+            .withFilePath(fs.createTempName())
             .withDataBlockEncoder(dataBlockEncoder)
             .withComparator(comparator)
             .withBloomType(family.getBloomFilterType())
@@ -1084,8 +988,8 @@ public class HStore implements Store {
 
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
-        + this + " of " + this.region.getRegionInfo().getRegionNameAsString()
-        + " into tmpdir=" + region.getTmpDir() + ", totalSize="
+        + this + " of " + this.getRegionInfo().getRegionNameAsString()
+        + " into tmpdir=" + fs.getTempDir() + ", totalSize="
         + StringUtils.humanReadableInt(cr.getSize()));
 
     List<StoreFile> sfs = new ArrayList<StoreFile>();
@@ -1098,8 +1002,8 @@ public class HStore implements Store {
         for (Path newFile: newFiles) {
           assert newFile != null;
           StoreFile sf = moveFileIntoPlace(newFile);
-          if (region.getCoprocessorHost() != null) {
-            region.getCoprocessorHost().postCompact(this, sf, cr);
+          if (this.getCoprocessorHost() != null) {
+            this.getCoprocessorHost().postCompact(this, sf, cr);
           }
           assert sf != null;
           sfs.add(sf);
@@ -1108,7 +1012,7 @@ public class HStore implements Store {
       } else {
         for (Path newFile: newFiles) {
           // Create storefile around what we wrote with a reader on it.
-          StoreFile sf = new StoreFile(this.fs, newFile, this.conf, this.cacheConf,
+          StoreFile sf = new StoreFile(this.getFileSystem(), newFile, this.conf, this.cacheConf,
             this.family.getBloomFilterType(), this.dataBlockEncoder);
           sf.createReader();
           sfs.add(sf);
@@ -1133,7 +1037,7 @@ public class HStore implements Store {
     StringBuilder message = new StringBuilder(
       "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
       + cr.getFiles().size() + " file(s) in " + this + " of "
-      + this.region.getRegionInfo().getRegionNameAsString()
+      + this.getRegionInfo().getRegionNameAsString()
       + " into ");
     if (sfs.isEmpty()) {
       message.append("none, ");
@@ -1155,17 +1059,11 @@ public class HStore implements Store {
   }
 
   // Package-visible for tests
-  StoreFile moveFileIntoPlace(Path newFile) throws IOException {
+  StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
     validateStoreFile(newFile);
     // Move the file into the right spot
-    Path destPath = new Path(homedir, newFile.getName());
-    LOG.info("Renaming compacted file at " + newFile + " to " + destPath);
-    if (!fs.rename(newFile, destPath)) {
-      String err = "Failed move of compacted file " + newFile + " to " +  destPath;
-      LOG.error(err);
-      throw new IOException(err);
-    }
-    StoreFile result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
+    Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
+    StoreFile result = new StoreFile(this.getFileSystem(), destPath, this.conf, this.cacheConf,
         this.family.getBloomFilterType(), this.dataBlockEncoder);
     result.createReader();
     return result;
@@ -1215,8 +1113,8 @@ public class HStore implements Store {
       for (Path newFile: newFiles) {
         // Move the compaction into place.
         StoreFile sf = moveFileIntoPlace(newFile);
-        if (region.getCoprocessorHost() != null) {
-          region.getCoprocessorHost().postCompact(this, sf, null);
+        if (this.getCoprocessorHost() != null) {
+          this.getCoprocessorHost().postCompact(this, sf, null);
         }
         ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
         tmp.add(sf);
@@ -1260,7 +1158,7 @@ public class HStore implements Store {
   public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
       throws IOException {
     // don't even select for compaction if writes are disabled
-    if (!this.region.areWritesEnabled()) {
+    if (!this.areWritesEnabled()) {
       return null;
     }
 
@@ -1269,9 +1167,9 @@ public class HStore implements Store {
     try {
       synchronized (filesCompacting) {
         // First, see if coprocessor would want to override selection.
-        if (region.getCoprocessorHost() != null) {
+        if (this.getCoprocessorHost() != null) {
           List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
-          boolean override = region.getCoprocessorHost().preCompactSelection(
+          boolean override = this.getCoprocessorHost().preCompactSelection(
               this, candidatesForCoproc, baseRequest);
           if (override) {
             // Coprocessor is overriding normal file selection.
@@ -1291,8 +1189,8 @@ public class HStore implements Store {
             this.offPeakCompactions.endOffPeakRequest();
           }
         }
-        if (region.getCoprocessorHost() != null) {
-          region.getCoprocessorHost().postCompactSelection(
+        if (this.getCoprocessorHost() != null) {
+          this.getCoprocessorHost().postCompactSelection(
               this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
         }
 
@@ -1329,7 +1227,7 @@ public class HStore implements Store {
             (priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
         compaction.getRequest().setIsMajor(isMajor);
         compaction.getRequest().setDescription(
-            region.getRegionNameAsString(), getColumnFamilyName());
+            getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
       }
     } finally {
       this.lock.readLock().unlock();
@@ -1366,7 +1264,7 @@ public class HStore implements Store {
       throws IOException {
     StoreFile storeFile = null;
     try {
-      storeFile = new StoreFile(this.fs, path, this.conf,
+      storeFile = new StoreFile(this.getFileSystem(), path, this.conf,
           this.cacheConf, this.family.getBloomFilterType(),
           NoOpDataBlockEncoder.INSTANCE);
       storeFile.createReader();
@@ -1425,8 +1323,7 @@ public class HStore implements Store {
 
       // let the archive util decide if we should archive or delete the files
       LOG.debug("Removing store files after compaction...");
-      HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.region,
-        this.family.getName(), compactedFiles);
+      this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
 
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
@@ -1475,12 +1372,12 @@ public class HStore implements Store {
     // at all (expired or not) has at least one version that will not expire.
     // Note that this method used to take a KeyValue as arguments. KeyValue
     // can be back-dated, a row key cannot.
-    long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.ttl;
+    long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
 
     KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
 
     GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
-      this.comparator, kv, ttlToUse, this.region.getRegionInfo().isMetaRegion());
+      this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
     this.lock.readLock().lock();
     try {
       // First go to the memstore.  Pick up deletes and candidates.
@@ -1634,7 +1531,7 @@ public class HStore implements Store {
     this.lock.readLock().lock();
     try {
       // Should already be enforced by the split policy!
-      assert !this.region.getRegionInfo().isMetaRegion();
+      assert !this.getRegionInfo().isMetaRegion();
       // Not split-able if we find a reference store file present in the store.
       if (hasReferences()) {
         assert false : "getSplitPoint() called on a region that can't split!";
@@ -1677,8 +1574,8 @@ public class HStore implements Store {
     lock.readLock().lock();
     try {
       KeyValueScanner scanner = null;
-      if (this.region.getCoprocessorHost() != null) {
-        scanner = this.region.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
+      if (this.getCoprocessorHost() != null) {
+        scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
       }
       if (scanner == null) {
         scanner = new StoreScanner(this, getScanInfo(), scan, targetCols);
@@ -1778,7 +1675,7 @@ public class HStore implements Store {
 
   @Override
   public HRegionInfo getRegionInfo() {
-    return this.region.getRegionInfo();
+    return this.fs.getRegionInfo();
   }
 
   @Override
@@ -1871,10 +1768,8 @@ public class HStore implements Store {
       }
       storeFile = HStore.this.commitFile(storeFilePath, cacheFlushId,
                                snapshotTimeRangeTracker, flushedSize, status);
-      if (HStore.this.region.getCoprocessorHost() != null) {
-        HStore.this.getHRegion()
-            .getCoprocessorHost()
-            .postFlush(HStore.this, storeFile);
+      if (HStore.this.getCoprocessorHost() != null) {
+        HStore.this.getCoprocessorHost().postFlush(HStore.this, storeFile);
       }
 
       // Add new file to store files.  Clear snapshot too while we have
@@ -1894,7 +1789,7 @@ public class HStore implements Store {
   }
 
   public static final long FIXED_OVERHEAD =
-      ClassSize.align((21 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
+      ClassSize.align((19 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
               + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Tue Mar  5 18:25:44 2013
@@ -655,11 +655,9 @@ public class SplitTransaction {
   throws IOException {
     FileSystem fs = this.parent.getFilesystem();
     byte [] family = sf.getFamily();
-    String encoded = this.hri_a.getEncodedName();
-    Path storedir = HStore.getStoreHomedir(splitdir, encoded, family);
+    Path storedir = HStore.getStoreHomedir(splitdir, this.hri_a, family);
     StoreFile.split(fs, storedir, sf, this.splitrow, false);
-    encoded = this.hri_b.getEncodedName();
-    storedir = HStore.getStoreHomedir(splitdir, encoded, family);
+    storedir = HStore.getStoreHomedir(splitdir, this.hri_b, family);
     StoreFile.split(fs, storedir, sf, this.splitrow, true);
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1452936&r1=1452935&r2=1452936&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Mar  5 18:25:44 2013
@@ -145,7 +145,7 @@ public interface Store extends HeapSize,
    * @param isCompaction whether we are creating a new file in a compaction
    * @return Writer for a new StoreFile in the tmp dir.
    */
-  public StoreFile.Writer createWriterInTmp(int maxKeyCount,
+  public StoreFile.Writer createWriterInTmp(long maxKeyCount,
     Compression.Algorithm compression, boolean isCompaction) throws IOException;
 
   // Compaction oriented methods