You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/12/07 00:36:30 UTC

svn commit: r1418131 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/master/handler/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions...

Author: stack
Date: Thu Dec  6 23:36:29 2012
New Revision: 1418131

URL: http://svn.apache.org/viewvc?rev=1418131&view=rev
Log:
HBASE-7282 Backport Compaction Tool to 0.94; RETRY

Added:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Thu Dec  6 23:36:29 2012
@@ -645,7 +645,7 @@ public class LruBlockCache implements Bl
     // Log size
     long totalSize = heapSize();
     long freeSize = maxSize - totalSize;
-    LruBlockCache.LOG.debug("LRU Stats: " +
+    LruBlockCache.LOG.debug("Stats: " +
         "total=" + StringUtils.byteDesc(totalSize) + ", " +
         "free=" + StringUtils.byteDesc(freeSize) + ", " +
         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
@@ -653,11 +653,11 @@ public class LruBlockCache implements Bl
         "accesses=" + stats.getRequestCount() + ", " +
         "hits=" + stats.getHitCount() + ", " +
         "hitRatio=" +
-          (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) +
+          (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
         "cachingHits=" + stats.getHitCachingCount() + ", " +
         "cachingHitsRatio=" +
-          (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) +
+          (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + ", " +
         "evictions=" + stats.getEvictionCount() + ", " +
         "evicted=" + stats.getEvictedCount() + ", " +
         "evictedPerRun=" + stats.evictedPerEviction());

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java Thu Dec  6 23:36:29 2012
@@ -142,16 +142,12 @@ public class CreateTableHandler extends 
     List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
     final int batchSize =
       this.conf.getInt("hbase.master.createtable.batchsize", 100);
-    HLog hlog = null;
     for (int regionIdx = 0; regionIdx < this.newRegions.length; regionIdx++) {
       HRegionInfo newRegion = this.newRegions[regionIdx];
       // 1. Create HRegion
       HRegion region = HRegion.createHRegion(newRegion,
         this.fileSystemManager.getRootDir(), this.conf,
-        this.hTableDescriptor, hlog);
-      if (hlog == null) {
-        hlog = region.getLog();
-      }
+        this.hTableDescriptor, null, false, true);
 
       regionInfos.add(region.getRegionInfo());
       if (regionIdx % batchSize == 0) {
@@ -163,7 +159,6 @@ public class CreateTableHandler extends 
       // 3. Close the new region to flush to disk.  Close log file too.
       region.close();
     }
-    hlog.closeAndDelete();
     if (regionInfos.size() > 0) {
       MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos);
     }

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java?rev=1418131&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java Thu Dec  6 23:36:29 2012
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/*
+ * The CompactionTool allows to execute a compaction specifying a:
+ * <ul>
+ *  <li>table folder (all regions and families will be compacted)
+ *  <li>region folder (all families in the region will be compacted)
+ *  <li>family folder (the store files will be compacted)
+ * </ul>
+ */
+@InterfaceAudience.Public
+public class CompactionTool extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(CompactionTool.class);
+
+  private final static String CONF_TMP_DIR = "hbase.tmp.dir";
+  private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
+  private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
+  private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
+
+  /**
+   * Class responsible to execute the Compaction on the specified path.
+   * The path can be a table, region or family directory.
+   */
+  private static class CompactionWorker {
+    private final boolean keepCompactedFiles;
+    private final boolean deleteCompacted;
+    private final Configuration conf;
+    private final FileSystem fs;
+    private final Path tmpDir;
+
+    public CompactionWorker(final FileSystem fs, final Configuration conf) {
+      this.conf = conf;
+      this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
+      this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
+      this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
+      this.fs = fs;
+    }
+
+    /**
+     * Execute the compaction on the specified path.
+     *
+     * @param path Directory path on which run a
+     * @param compactOnce Execute just a single step of compaction.
+     */
+    public void compact(final Path path, final boolean compactOnce) throws IOException {
+      if (isFamilyDir(fs, path)) {
+        Path regionDir = path.getParent();
+        Path tableDir = regionDir.getParent();
+        HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
+        HRegion region = loadRegion(fs, conf, htd, regionDir);
+        compactStoreFiles(region, path, compactOnce);
+      } else if (isRegionDir(fs, path)) {
+        Path tableDir = path.getParent();
+        HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
+        compactRegion(htd, path, compactOnce);
+      } else if (isTableDir(fs, path)) {
+        compactTable(path, compactOnce);
+      } else {
+        throw new IOException(
+          "Specified path is not a table, region or family directory. path=" + path);
+      }
+    }
+
+    private void compactTable(final Path tableDir, final boolean compactOnce)
+        throws IOException {
+      HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
+      LOG.info("Compact table=" + htd.getNameAsString());
+      for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
+        compactRegion(htd, regionDir, compactOnce);
+      }
+    }
+
+    private void compactRegion(final HTableDescriptor htd, final Path regionDir,
+        final boolean compactOnce) throws IOException {
+      HRegion region = loadRegion(fs, conf, htd, regionDir);
+      LOG.info("Compact table=" + htd.getNameAsString() +
+        " region=" + region.getRegionNameAsString());
+      for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
+        compactStoreFiles(region, familyDir, compactOnce);
+      }
+    }
+
+    /**
+     * Execute the actual compaction job.
+     * If the compact once flag is not specified, execute the compaction until
+     * no more compactions are needed. Uses the Configuration settings provided.
+     */
+    private void compactStoreFiles(final HRegion region, final Path familyDir,
+        final boolean compactOnce) throws IOException {
+      LOG.info("Compact table=" + region.getTableDesc().getNameAsString() +
+        " region=" + region.getRegionNameAsString() +
+        " family=" + familyDir.getName());
+      Store store = getStore(region, familyDir);
+      do {
+        CompactionRequest cr = store.requestCompaction();
+        StoreFile storeFile = store.compact(cr);
+        if (storeFile != null) {
+          if (keepCompactedFiles && deleteCompacted) {
+            fs.delete(storeFile.getPath(), false);
+          }
+        }
+      } while (store.needsCompaction() && !compactOnce);
+    }
+
+    /**
+     * Create a "mock" HStore that uses the tmpDir specified by the user and
+     * the store dir to compact as source.
+     */
+    private Store getStore(final HRegion region, final Path storeDir) throws IOException {
+      byte[] familyName = Bytes.toBytes(storeDir.getName());
+      HColumnDescriptor hcd = region.getTableDesc().getFamily(familyName);
+      // Create a Store w/ check of hbase.rootdir blanked out and return our
+      // list of files instead of have Store search its home dir.
+      return new Store(tmpDir, region, hcd, fs, conf) {
+        @Override
+        public FileStatus[] getStoreFiles() throws IOException {
+          return this.fs.listStatus(getHomedir());
+        }
+
+        @Override
+        Path createStoreHomeDir(FileSystem fs, Path homedir) throws IOException {
+          return storeDir;
+        }
+      };
+    }
+
+    private static HRegion loadRegion(final FileSystem fs, final Configuration conf,
+        final HTableDescriptor htd, final Path regionDir) throws IOException {
+      Path rootDir = regionDir.getParent().getParent();
+      HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir);
+      return HRegion.createHRegion(hri, rootDir, conf, htd, null, false, true);
+    }
+  }
+
+  private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
+    Path regionInfo = new Path(path, HRegion.REGIONINFO_FILE);
+    return fs.exists(regionInfo);
+  }
+
+  private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
+    return FSTableDescriptors.getTableInfoPath(fs, path) != null;
+  }
+
+  private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
+    return isRegionDir(fs, path.getParent());
+  }
+
+  private static class CompactionMapper
+      extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
+    private CompactionWorker compactor = null;
+    private boolean compactOnce = false;
+
+    @Override
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
+
+      try {
+        FileSystem fs = FileSystem.get(conf);
+        this.compactor = new CompactionWorker(fs, conf);
+      } catch (IOException e) {
+        throw new RuntimeException("Could not get the input FileSystem", e);
+      }
+    }
+
+    @Override
+    public void map(LongWritable key, Text value, Context context)
+        throws InterruptedException, IOException {
+      Path path = new Path(value.toString());
+      this.compactor.compact(path, compactOnce);
+    }
+  }
+
+  /**
+   * Input format that uses store files block location as input split locality.
+   */
+  private static class CompactionInputFormat extends TextInputFormat {
+    @Override
+    protected boolean isSplitable(JobContext context, Path file) {
+      return true;
+    }
+
+    /**
+     * Returns a split for each store files directory using the block location
+     * of each file as locality reference.
+     */
+    @Override
+    public List<InputSplit> getSplits(JobContext job) throws IOException {
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      List<FileStatus> files = listStatus(job);
+
+      Text key = new Text();
+      for (FileStatus file: files) {
+        Path path = file.getPath();
+        FileSystem fs = path.getFileSystem(job.getConfiguration());
+        LineReader reader = new LineReader(fs.open(path));
+        long pos = 0;
+        int n;
+        try {
+          while ((n = reader.readLine(key)) > 0) {
+            String[] hosts = getStoreDirHosts(fs, path);
+            splits.add(new FileSplit(path, pos, n, hosts));
+            pos += n;
+          }
+        } finally {
+          reader.close();
+        }
+      }
+
+      return splits;
+    }
+
+    /**
+     * return the top hosts of the store files, used by the Split
+     */
+    private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
+        throws IOException {
+      FileStatus[] files = FSUtils.listStatus(fs, path, null);
+      if (files == null) {
+        return new String[] {};
+      }
+
+      HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
+      for (FileStatus hfileStatus: files) {
+        HDFSBlocksDistribution storeFileBlocksDistribution =
+          FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
+        hdfsBlocksDistribution.add(storeFileBlocksDistribution);
+      }
+
+      List<String> hosts = hdfsBlocksDistribution.getTopHosts();
+      return hosts.toArray(new String[hosts.size()]);
+    }
+
+    /**
+     * Create the input file for the given directories to compact.
+     * The file is a TextFile with each line corrisponding to a
+     * store files directory to compact.
+     */
+    public static void createInputFile(final FileSystem fs, final Path path,
+        final Set<Path> toCompactDirs) throws IOException {
+      // Extract the list of store dirs
+      List<Path> storeDirs = new LinkedList<Path>();
+      for (Path compactDir: toCompactDirs) {
+        if (isFamilyDir(fs, compactDir)) {
+          storeDirs.add(compactDir);
+        } else if (isRegionDir(fs, compactDir)) {
+          for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
+            storeDirs.add(familyDir);
+          }
+        } else if (isTableDir(fs, compactDir)) {
+          // Lookup regions
+          for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
+            for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
+              storeDirs.add(familyDir);
+            }
+          }
+        } else {
+          throw new IOException(
+            "Specified path is not a table, region or family directory. path=" + compactDir);
+        }
+      }
+
+      // Write Input File
+      FSDataOutputStream stream = fs.create(path);
+      LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
+      try {
+        final byte[] newLine = Bytes.toBytes("\n");
+        for (Path storeDir: storeDirs) {
+          stream.write(Bytes.toBytes(storeDir.toString()));
+          stream.write(newLine);
+        }
+      } finally {
+        stream.close();
+      }
+    }
+  }
+
+  /**
+   * Execute compaction, using a Map-Reduce job.
+   */
+  private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
+      final boolean compactOnce) throws Exception {
+    Configuration conf = getConf();
+    conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
+
+    Job job = new Job(conf);
+    job.setJobName("CompactionTool");
+    job.setJarByClass(CompactionTool.class);
+    job.setMapperClass(CompactionMapper.class);
+    job.setInputFormatClass(CompactionInputFormat.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setMapSpeculativeExecution(false);
+    job.setNumReduceTasks(0);
+
+    String stagingName = "compact-" + EnvironmentEdgeManager.currentTimeMillis();
+    Path stagingDir = new Path(conf.get(CONF_TMP_DIR), stagingName);
+    fs.mkdirs(stagingDir);
+    try {
+      // Create input file with the store dirs
+      Path inputPath = new Path(stagingDir, stagingName);
+      CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
+      CompactionInputFormat.addInputPath(job, inputPath);
+
+      // Initialize credential for secure cluster
+      TableMapReduceUtil.initCredentials(job);
+
+      // Start the MR Job and wait
+      return job.waitForCompletion(true) ? 0 : 1;
+    } finally {
+      fs.delete(stagingDir, true);
+    }
+  }
+
+  /**
+   * Execute compaction, from this client, one path at the time.
+   */
+  private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
+      final boolean compactOnce) throws IOException {
+    CompactionWorker worker = new CompactionWorker(fs, getConf());
+    for (Path path: toCompactDirs) {
+      worker.compact(path, compactOnce);
+    }
+    return 0;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Set<Path> toCompactDirs = new HashSet<Path>();
+    boolean compactOnce = false;
+    boolean mapred = false;
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    try {
+      for (int i = 0; i < args.length; ++i) {
+        String opt = args[i];
+        if (opt.equals("-compactOnce")) {
+          compactOnce = true;
+        } else if (opt.equals("-mapred")) {
+          mapred = true;
+        } else if (!opt.startsWith("-")) {
+          Path path = new Path(opt);
+          FileStatus status = fs.getFileStatus(path);
+          if (!status.isDir()) {
+            printUsage("Specified path is not a directory. path=" + path);
+            return 1;
+          }
+          toCompactDirs.add(path);
+        } else {
+          printUsage();
+        }
+      }
+    } catch (Exception e) {
+      printUsage(e.getMessage());
+      return 1;
+    }
+
+    if (toCompactDirs.size() == 0) {
+      printUsage("No directories to compact specified.");
+      return 1;
+    }
+
+    // Execute compaction!
+    if (mapred) {
+      return doMapReduce(fs, toCompactDirs, compactOnce);
+    } else {
+      return doClient(fs, toCompactDirs, compactOnce);
+    }
+  }
+
+  private void printUsage() {
+    printUsage(null);
+  }
+
+  private void printUsage(final String message) {
+    if (message != null && message.length() > 0) {
+      System.err.println(message);
+    }
+    System.err.println("Usage: java " + this.getClass().getName() + " \\");
+    System.err.println("  [-compactOnce] [-mapred] [-D<property=value>]* files...");
+    System.err.println();
+    System.err.println("Options:");
+    System.err.println(" mapred         Use MapReduce to run compaction.");
+    System.err.println(" compactOnce    Execute just one compaction step. (default: while needed)");
+    System.err.println();
+    System.err.println("Note: -D properties will be applied to the conf used. ");
+    System.err.println("For example: ");
+    System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
+    System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
+    System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println(" To compact the full 'TestTable' using MapReduce:");
+    System.err.println(" $ bin/hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/TestTable");
+    System.err.println();
+    System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
+    System.err.println(" $ bin/hbase " + this.getClass().getName() + " hdfs:///hbase/TestTable/abc/x");
+  }
+
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java?rev=1418131&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java Thu Dec  6 23:36:29 2012
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Compact passed set of files.
+ * Create an instance and then call {@ink #compact(Store, Collection, boolean, long)}.
+ */
+@InterfaceAudience.Private
+class Compactor extends Configured {
+  private static final Log LOG = LogFactory.getLog(Compactor.class);
+  private CompactionProgress progress;
+
+  Compactor(final Configuration c) {
+    super(c);
+  }
+
+  /**
+   * Do a minor/major compaction on an explicit set of storefiles from a Store.
+   *
+   * @param store Store the files belong to
+   * @param filesToCompact which files to compact
+   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+   * @param maxId Readers maximum sequence id.
+   * @return Product of compaction or null if all cells expired or deleted and
+   * nothing made it through the compaction.
+   * @throws IOException
+   */
+  StoreFile.Writer compact(final Store store,
+      final Collection<StoreFile> filesToCompact,
+      final boolean majorCompaction, final long maxId)
+  throws IOException {
+    // Calculate maximum key count after compaction (for blooms)
+    // Also calculate earliest put timestamp if major compaction
+    int maxKeyCount = 0;
+    long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+    for (StoreFile file : filesToCompact) {
+      StoreFile.Reader r = file.getReader();
+      if (r == null) {
+        LOG.warn("Null reader for " + file.getPath());
+        continue;
+      }
+      // NOTE: getFilterEntries could cause under-sized blooms if the user
+      //       switches bloom type (e.g. from ROW to ROWCOL)
+      long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType()) ?
+          r.getFilterEntries() : r.getEntries();
+      maxKeyCount += keyCount;
+      // For major compactions calculate the earliest put timestamp
+      // of all involved storefiles. This is used to remove 
+      // family delete marker during the compaction.
+      if (majorCompaction) {
+        byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+        if (tmp == null) {
+          // There's a file with no information, must be an old one
+          // assume we have very old puts
+          earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+        } else {
+          earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Compacting " + file +
+          ", keycount=" + keyCount +
+          ", bloomtype=" + r.getBloomFilterType().toString() +
+          ", size=" + StringUtils.humanReadableInt(r.length()) +
+          ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
+          (majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
+      }
+    }
+
+    // keep track of compaction progress
+    this.progress = new CompactionProgress(maxKeyCount);
+    // Get some configs
+    int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
+    Compression.Algorithm compression = store.getFamily().getCompression();
+    // Avoid overriding compression setting for major compactions if the user
+    // has not specified it separately
+    Compression.Algorithm compactionCompression =
+      (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
+      store.getFamily().getCompactionCompression(): compression;
+
+    // For each file, obtain a scanner:
+    List<StoreFileScanner> scanners = StoreFileScanner
+      .getScannersForStoreFiles(filesToCompact, false, false, true);
+
+    // Make the instantiation lazy in case compaction produces no product; i.e.
+    // where all source cells are expired or deleted.
+    StoreFile.Writer writer = null;
+    // Find the smallest read point across all the Scanners.
+    long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
+    MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+    try {
+      InternalScanner scanner = null;
+      try {
+        if (store.getHRegion().getCoprocessorHost() != null) {
+          scanner = store.getHRegion()
+              .getCoprocessorHost()
+              .preCompactScannerOpen(store, scanners,
+                  majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
+        }
+        if (scanner == null) {
+          Scan scan = new Scan();
+          scan.setMaxVersions(store.getFamily().getMaxVersions());
+          /* Include deletes, unless we are doing a major compaction */
+          scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
+            majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+            smallestReadPoint, earliestPutTs);
+        }
+        if (store.getHRegion().getCoprocessorHost() != null) {
+          InternalScanner cpScanner =
+            store.getHRegion().getCoprocessorHost().preCompact(store, scanner);
+          // NULL scanner returned from coprocessor hooks means skip normal processing
+          if (cpScanner == null) {
+            return null;
+          }
+          scanner = cpScanner;
+        }
+
+        int bytesWritten = 0;
+        // since scanner.next() can return 'false' but still be delivering data,
+        // we have to use a do/while loop.
+        List<KeyValue> kvs = new ArrayList<KeyValue>();
+        // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+        boolean hasMore;
+        do {
+          hasMore = scanner.next(kvs, compactionKVMax);
+          if (writer == null && !kvs.isEmpty()) {
+            writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
+          }
+          if (writer != null) {
+            // output to writer:
+            for (KeyValue kv : kvs) {
+              if (kv.getMemstoreTS() <= smallestReadPoint) {
+                kv.setMemstoreTS(0);
+              }
+              writer.append(kv);
+              // update progress per key
+              ++progress.currentCompactedKVs;
+
+              // check periodically to see if a system stop is requested
+              if (Store.closeCheckInterval > 0) {
+                bytesWritten += kv.getLength();
+                if (bytesWritten > Store.closeCheckInterval) {
+                  bytesWritten = 0;
+                  isInterrupted(store, writer);
+                }
+              }
+            }
+          }
+          kvs.clear();
+        } while (hasMore);
+      } finally {
+        if (scanner != null) {
+          scanner.close();
+        }
+      }
+    } finally {
+      if (writer != null) {
+        writer.appendMetadata(maxId, majorCompaction);
+        writer.close();
+      }
+    }
+    return writer;
+  }
+
+  void isInterrupted(final Store store, final StoreFile.Writer writer)
+  throws IOException {
+    if (store.getHRegion().areWritesEnabled()) return;
+    // Else cleanup.
+    writer.close();
+    store.getFileSystem().delete(writer.getPath(), false);
+    throw new InterruptedIOException( "Aborting compaction of store " + store +
+      " in region " + store.getHRegion() + " because user requested stop.");
+  }
+
+  CompactionProgress getProgress() {
+    return this.progress;
+  }
+}

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Dec  6 23:36:29 2012
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
@@ -62,6 +63,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -230,12 +232,12 @@ public class HRegion implements HeapSize
    * The directory for the table this region is part of.
    * This directory contains the directory for this region.
    */
-  final Path tableDir;
+  private final Path tableDir;
 
-  final HLog log;
-  final FileSystem fs;
-  final Configuration conf;
-  final int rowLockWaitDuration;
+  private final HLog log;
+  private final FileSystem fs;
+  private final Configuration conf;
+  private final int rowLockWaitDuration;
   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
 
   // The internal wait duration to acquire a lock before read/update
@@ -256,8 +258,8 @@ public class HRegion implements HeapSize
   // purge timeout, when a RPC call will be terminated by the RPC engine.
   final long maxBusyWaitDuration;
 
-  final HRegionInfo regionInfo;
-  final Path regiondir;
+  private final HRegionInfo regionInfo;
+  private final Path regiondir;
   KeyValue.KVComparator comparator;
 
   private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
@@ -724,7 +726,7 @@ public class HRegion implements HeapSize
   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
     if (this.rsAccounting != null) {
       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
-    }  
+    }
     return this.memstoreSize.getAndAdd(memStoreSize);
   }
 
@@ -750,7 +752,7 @@ public class HRegion implements HeapSize
 
     // and then create the file
     Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
-    
+
     // if datanode crashes or if the RS goes down just before the close is called while trying to
     // close the created regioninfo file in the .tmp directory then on next
     // creation we will be getting AlreadyCreatedException.
@@ -758,7 +760,7 @@ public class HRegion implements HeapSize
     if (FSUtils.isExists(fs, tmpPath)) {
       FSUtils.delete(fs, tmpPath, true);
     }
-    
+
     FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms);
 
     try {
@@ -775,6 +777,26 @@ public class HRegion implements HeapSize
     }
   }
 
+  /**
+   * @param fs
+   * @param dir
+   * @return An HRegionInfo instance gotten from the <code>.regioninfo</code> file under region dir
+   * @throws IOException
+   */
+  public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir)
+  throws IOException {
+    Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE);
+    if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString());
+    FSDataInputStream in = fs.open(regioninfo);
+    try {
+      HRegionInfo hri = new HRegionInfo();
+      hri.readFields(in);
+      return hri;
+    } finally {
+      in.close();
+    }
+  }
+
   /** @return a HRegionInfo object for this region */
   public HRegionInfo getRegionInfo() {
     return this.regionInfo;
@@ -1021,19 +1043,16 @@ public class HRegion implements HeapSize
     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
   }
 
-  private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+  static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
       final String threadNamePrefix) {
-    ThreadPoolExecutor openAndCloseThreadPool = Threads
-        .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
-            new ThreadFactory() {
-              private int count = 1;
-
-              public Thread newThread(Runnable r) {
-                Thread t = new Thread(r, threadNamePrefix + "-" + count++);
-                return t;
-              }
-            });
-    return openAndCloseThreadPool;
+    return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+      new ThreadFactory() {
+        private int count = 1;
+
+        public Thread newThread(Runnable r) {
+          return new Thread(r, threadNamePrefix + "-" + count++);
+        }
+      });
   }
 
    /**
@@ -1979,7 +1998,7 @@ public class HRegion implements HeapSize
     System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length);
     return batchMutate(mutationsAndLocks);
   }
-  
+
   /**
    * Perform a batch of mutations.
    * It supports only Put and Delete mutations and will ignore other types passed.
@@ -2333,7 +2352,7 @@ public class HRegion implements HeapSize
 
       // do after lock
       final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs;
-            
+
       // See if the column families were consistent through the whole thing.
       // if they were then keep them. If they were not then pass a null.
       // null will be treated as unknown.
@@ -2636,7 +2655,7 @@ public class HRegion implements HeapSize
     // do after lock
     final long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
-    
+
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -3759,6 +3778,7 @@ public class HRegion implements HeapSize
    * @param conf
    * @param hTableDescriptor
    * @param hlog shared HLog
+   * @param boolean initialize - true to initialize the region
    * @return new HRegion
    *
    * @throws IOException
@@ -3766,7 +3786,36 @@ public class HRegion implements HeapSize
   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
                                       final Configuration conf,
                                       final HTableDescriptor hTableDescriptor,
-                                      final HLog hlog)
+                                      final HLog hlog,
+                                      final boolean initialize)
+      throws IOException {
+    return createHRegion(info, rootDir, conf, hTableDescriptor,
+        hlog, initialize, false);
+  }
+
+  /**
+   * Convenience method creating new HRegions. Used by createTable.
+   * The {@link HLog} for the created region needs to be closed
+   * explicitly, if it is not null.
+   * Use {@link HRegion#getLog()} to get access.
+   *
+   * @param info Info for region to create.
+   * @param rootDir Root directory for HBase instance
+   * @param conf
+   * @param hTableDescriptor
+   * @param hlog shared HLog
+   * @param boolean initialize - true to initialize the region
+   * @param boolean ignoreHLog
+      - true to skip generate new hlog if it is null, mostly for createTable
+   * @return new HRegion
+   *
+   * @throws IOException
+   */
+  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
+                                      final Configuration conf,
+                                      final HTableDescriptor hTableDescriptor,
+                                      final HLog hlog,
+                                      final boolean initialize, final boolean ignoreHLog)
       throws IOException {
     LOG.info("creating HRegion " + info.getTableNameAsString()
         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
@@ -3778,16 +3827,26 @@ public class HRegion implements HeapSize
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(regionDir);
     HLog effectiveHLog = hlog;
-    if (hlog == null) {
+    if (hlog == null && !ignoreHLog) {
       effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
           new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
     }
     HRegion region = HRegion.newHRegion(tableDir,
         effectiveHLog, fs, conf, info, hTableDescriptor, null);
-    region.initialize();
+    if (initialize) {
+      region.initialize();
+    }
     return region;
   }
 
+  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
+                                      final Configuration conf,
+                                      final HTableDescriptor hTableDescriptor,
+                                      final HLog hlog)
+    throws IOException {
+    return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
+  }
+
   /**
    * Open a Region.
    * @param info Info for region to be opened.
@@ -4299,7 +4358,7 @@ public class HRegion implements HeapSize
     // do after lock
     final long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updateGetMetrics(get.familySet(), after - now);
-    
+
     return results;
   }
 
@@ -4627,10 +4686,10 @@ public class HRegion implements HeapSize
       closeRegionOperation();
     }
 
-    
+
     long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
-    
+
     if (flush) {
       // Request a cache flush. Do it outside update lock.
       requestFlush();
@@ -4755,7 +4814,7 @@ public class HRegion implements HeapSize
       long after = EnvironmentEdgeManager.currentTimeMillis();
       this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
     }
-    
+
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -5248,7 +5307,7 @@ public class HRegion implements HeapSize
    */
   private void recordPutWithoutWal(final Map<byte [], List<KeyValue>> familyMap) {
     if (numPutsWithoutWAL.getAndIncrement() == 0) {
-      LOG.info("writing data to region " + this + 
+      LOG.info("writing data to region " + this +
                " with WAL disabled. Data may be lost in the event of a crash.");
     }
 
@@ -5360,11 +5419,11 @@ public class HRegion implements HeapSize
     final HLog log = new HLog(fs, logdir, oldLogDir, c);
     try {
       processTable(fs, tableDir, log, c, majorCompact);
-     } finally {
+    } finally {
        log.close();
        // TODO: is this still right?
        BlockCache bc = new CacheConfig(c).getBlockCache();
        if (bc != null) bc.shutdown();
-     }
+    }
   }
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Dec  6 23:36:29 2012
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -132,9 +131,6 @@ public class Store extends SchemaConfigu
   private volatile long totalUncompressedBytes = 0L;
   private final Object flushLock = new Object();
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final String storeNameStr;
-  private CompactionProgress progress;
-  private final int compactionKVMax;
   private final boolean verifyBulkLoads;
 
   /* The default priority for user-specified compaction requests.
@@ -158,10 +154,6 @@ public class Store extends SchemaConfigu
     new CopyOnWriteArraySet<ChangedReadersObserver>();
 
   private final int blocksize;
-  /** Compression algorithm for flush files and minor compaction */
-  private final Compression.Algorithm compression;
-  /** Compression algorithm for major compaction */
-  private final Compression.Algorithm compactionCompression;
   private HFileDataBlockEncoder dataBlockEncoder;
 
   /** Checksum configuration */
@@ -171,6 +163,8 @@ public class Store extends SchemaConfigu
   // Comparing KeyValues
   final KeyValue.KVComparator comparator;
 
+  private final Compactor compactor;
+
   /**
    * Constructor
    * @param basedir qualified path under which the region directory lives;
@@ -185,25 +179,16 @@ public class Store extends SchemaConfigu
   protected Store(Path basedir, HRegion region, HColumnDescriptor family,
     FileSystem fs, Configuration conf)
   throws IOException {
-    super(conf, region.getTableDesc().getNameAsString(),
+    super(conf, region.getRegionInfo().getTableNameAsString(),
         Bytes.toString(family.getName()));
-    HRegionInfo info = region.regionInfo;
+    HRegionInfo info = region.getRegionInfo();
     this.fs = fs;
-    this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
-    if (!this.fs.exists(this.homedir)) {
-      if (!this.fs.mkdirs(this.homedir))
-        throw new IOException("Failed create of: " + this.homedir.toString());
-    }
+    Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
+    this.homedir = createStoreHomeDir(this.fs, p);
     this.region = region;
     this.family = family;
     this.conf = conf;
     this.blocksize = family.getBlocksize();
-    this.compression = family.getCompression();
-    // avoid overriding compression setting for major compactions if the user
-    // has not specified it separately
-    this.compactionCompression =
-      (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
-        family.getCompactionCompression() : this.compression;
 
     this.dataBlockEncoder =
         new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
@@ -228,7 +213,6 @@ public class Store extends SchemaConfigu
         "ms in store " + this);
     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
     this.memstore = new MemStore(conf, this.comparator);
-    this.storeNameStr = getColumnFamilyName();
 
     // By default, compact if storefile.count >= minFilesToCompact
     this.minFilesToCompact = Math.max(2,
@@ -245,10 +229,8 @@ public class Store extends SchemaConfigu
       this.region.memstoreFlushSize);
     this.maxCompactSize
       = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
-    this.compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
 
-    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify",
-        false);
+    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
 
     if (Store.closeCheckInterval == 0) {
       Store.closeCheckInterval = conf.getInt(
@@ -260,6 +242,47 @@ public class Store extends SchemaConfigu
     this.checksumType = getChecksumType(conf);
     // initilize bytes per checksum
     this.bytesPerChecksum = getBytesPerChecksum(conf);
+    // Create a compaction tool instance
+    this.compactor = new Compactor(this.conf);
+  }
+
+  /**
+   * @param family
+   * @return
+   */
+  long getTTL(final HColumnDescriptor family) {
+    // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
+    long ttl = family.getTimeToLive();
+    if (ttl == HConstants.FOREVER) {
+      // Default is unlimited ttl.
+      ttl = Long.MAX_VALUE;
+    } else if (ttl == -1) {
+      ttl = Long.MAX_VALUE;
+    } else {
+      // Second -> ms adjust for user data
+      ttl *= 1000;
+    }
+    return ttl;
+  }
+
+  /**
+   * Create this store's homedir
+   * @param fs
+   * @param homedir
+   * @return Return <code>homedir</code>
+   * @throws IOException
+   */
+  Path createStoreHomeDir(final FileSystem fs,
+      final Path homedir) throws IOException {
+    if (!fs.exists(homedir)) {
+      if (!fs.mkdirs(homedir))
+        throw new IOException("Failed create of: " + homedir.toString());
+    }
+    return homedir;
+  }
+
+  FileSystem getFileSystem() {
+    return this.fs;
   }
 
   /**
@@ -320,7 +343,7 @@ public class Store extends SchemaConfigu
    * Return the directory in which this store stores its
    * StoreFiles
    */
-  public Path getHomedir() {
+  Path getHomedir() {
     return homedir;
   }
 
@@ -339,6 +362,10 @@ public class Store extends SchemaConfigu
     this.dataBlockEncoder = blockEncoder;
   }
 
+  FileStatus [] getStoreFiles() throws IOException {
+    return FSUtils.listStatus(this.fs, this.homedir, null);
+  }
+
   /**
    * Creates an unsorted list of StoreFile loaded in parallel
    * from the given directory.
@@ -346,7 +373,7 @@ public class Store extends SchemaConfigu
    */
   private List<StoreFile> loadStoreFiles() throws IOException {
     ArrayList<StoreFile> results = new ArrayList<StoreFile>();
-    FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
+    FileStatus files[] = getStoreFiles();
 
     if (files == null || files.length == 0) {
       return results;
@@ -637,7 +664,7 @@ public class Store extends SchemaConfigu
           storeFileCloserThreadPool.shutdownNow();
         }
       }
-      LOG.debug("closed " + this.storeNameStr);
+      LOG.info("Closed " + this);
       return result;
     } finally {
       this.lock.writeLock().unlock();
@@ -723,6 +750,7 @@ public class Store extends SchemaConfigu
       scanner = cpScanner;
     }
     try {
+      int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
       // TODO:  We can fail in the below block before we complete adding this
       // flush to list of store files.  Add cleanup of anything put on filesystem
       // if we fail.
@@ -736,7 +764,7 @@ public class Store extends SchemaConfigu
           List<KeyValue> kvs = new ArrayList<KeyValue>();
           boolean hasMore;
           do {
-            hasMore = scanner.next(kvs, this.compactionKVMax);
+            hasMore = scanner.next(kvs, compactionKVMax);
             if (!kvs.isEmpty()) {
               for (KeyValue kv : kvs) {
                 // If we know that this KV is going to be included always, then let us
@@ -828,7 +856,7 @@ public class Store extends SchemaConfigu
    */
   private StoreFile.Writer createWriterInTmp(int maxKeyCount)
   throws IOException {
-    return createWriterInTmp(maxKeyCount, this.compression, false);
+    return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
   }
 
   /*
@@ -981,16 +1009,12 @@ public class Store extends SchemaConfigu
    * @param cr
    *          compaction details obtained from requestCompaction()
    * @throws IOException
+   * @return Storefile we compacted into or null if we failed or opted out early.
    */
-  void compact(CompactionRequest cr) throws IOException {
-    if (cr == null || cr.getFiles().isEmpty()) {
-      return;
-    }
-    Preconditions.checkArgument(cr.getStore().toString()
-        .equals(this.toString()));
-
+  StoreFile compact(CompactionRequest cr) throws IOException {
+    if (cr == null || cr.getFiles().isEmpty()) return null;
+    Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
     List<StoreFile> filesToCompact = cr.getFiles();
-
     synchronized (filesCompacting) {
       // sanity check: we're compacting files that this store knows about
       // TODO: change this to LOG.error() after more debugging
@@ -1002,19 +1026,26 @@ public class Store extends SchemaConfigu
 
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
-        + this.storeNameStr + " of "
+        + this + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
         + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
         + StringUtils.humanReadableInt(cr.getSize()));
 
     StoreFile sf = null;
     try {
-      StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
-          maxId);
+      StoreFile.Writer writer =
+        this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
       // Move the compaction into place.
-      sf = completeCompaction(filesToCompact, writer);
-      if (region.getCoprocessorHost() != null) {
-        region.getCoprocessorHost().postCompact(this, sf);
+      if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
+        sf = completeCompaction(filesToCompact, writer);
+        if (region.getCoprocessorHost() != null) {
+          region.getCoprocessorHost().postCompact(this, sf);
+        }
+      } else {
+        // Create storefile around what we wrote with a reader on it.
+        sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
+          this.family.getBloomFilterType(), this.dataBlockEncoder);
+        sf.createReader();
       }
     } finally {
       synchronized (filesCompacting) {
@@ -1023,7 +1054,7 @@ public class Store extends SchemaConfigu
     }
 
     LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
-        + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+        + filesToCompact.size() + " file(s) in " + this + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
         + " into " +
         (sf == null ? "none" : sf.getPath().getName()) +
@@ -1031,6 +1062,7 @@ public class Store extends SchemaConfigu
           StringUtils.humanReadableInt(sf.getReader().length()))
         + "; total size for store is "
         + StringUtils.humanReadableInt(storeSize));
+    return sf;
   }
 
   /**
@@ -1070,7 +1102,8 @@ public class Store extends SchemaConfigu
 
     try {
       // Ready to go. Have list of files to compact.
-      StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
+      StoreFile.Writer writer =
+        this.compactor.compact(this, filesToCompact, isMajor, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
       if (region.getCoprocessorHost() != null) {
@@ -1119,10 +1152,10 @@ public class Store extends SchemaConfigu
   }
 
   /** getter for CompactionProgress object
-   * @return CompactionProgress object
+   * @return CompactionProgress object; can be null
    */
   public CompactionProgress getCompactionProgress() {
-    return this.progress;
+    return this.compactor.getProgress();
   }
 
   /*
@@ -1174,19 +1207,19 @@ public class Store extends SchemaConfigu
         if (sf.isMajorCompaction() &&
             (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping major compaction of " + this.storeNameStr +
+            LOG.debug("Skipping major compaction of " + this +
                 " because one (major) compacted file only and oldestTime " +
                 oldest + "ms is < ttl=" + this.ttl);
           }
         } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
-          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+          LOG.debug("Major compaction triggered on store " + this +
             ", because keyvalues outdated; time since last major compaction " +
             (now - lowTimestamp) + "ms");
           result = true;
         }
       } else {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+          LOG.debug("Major compaction triggered on store " + this +
               "; time since last major compaction " + (now - lowTimestamp) + "ms");
         }
         result = true;
@@ -1376,12 +1409,12 @@ public class Store extends SchemaConfigu
              compactSelection.getFilesToCompact().get(pos).getReader().length()
                > maxCompactSize &&
              !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
-      compactSelection.clearSubList(0, pos);
+      if (pos != 0) compactSelection.clearSubList(0, pos);
     }
 
     if (compactSelection.getFilesToCompact().isEmpty()) {
       LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
-        this.storeNameStr + ": no store files to compact");
+        this + ": no store files to compact");
       compactSelection.emptyFileList();
       return compactSelection;
     }
@@ -1468,7 +1501,7 @@ public class Store extends SchemaConfigu
       // if we don't have enough files to compact, just wait
       if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipped compaction of " + this.storeNameStr
+          LOG.debug("Skipped compaction of " + this
             + ".  Only " + (end - start) + " file(s) of size "
             + StringUtils.humanReadableInt(totalSize)
             + " have met compaction criteria.");
@@ -1495,149 +1528,6 @@ public class Store extends SchemaConfigu
   }
 
   /**
-   * Do a minor/major compaction on an explicit set of storefiles in a Store.
-   * Uses the scan infrastructure to make it easy.
-   *
-   * @param filesToCompact which files to compact
-   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
-   * @param maxId Readers maximum sequence id.
-   * @return Product of compaction or null if all cells expired or deleted and
-   * nothing made it through the compaction.
-   * @throws IOException
-   */
-  StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
-                               final boolean majorCompaction, final long maxId)
-      throws IOException {
-    // calculate maximum key count after compaction (for blooms)
-    int maxKeyCount = 0;
-    long earliestPutTs = HConstants.LATEST_TIMESTAMP;
-    for (StoreFile file : filesToCompact) {
-      StoreFile.Reader r = file.getReader();
-      if (r != null) {
-        // NOTE: getFilterEntries could cause under-sized blooms if the user
-        //       switches bloom type (e.g. from ROW to ROWCOL)
-        long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
-          ? r.getFilterEntries() : r.getEntries();
-        maxKeyCount += keyCount;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Compacting " + file +
-            ", keycount=" + keyCount +
-            ", bloomtype=" + r.getBloomFilterType().toString() +
-            ", size=" + StringUtils.humanReadableInt(r.length()) +
-            ", encoding=" + r.getHFileReader().getEncodingOnDisk());
-        }
-      }
-      // For major compactions calculate the earliest put timestamp
-      // of all involved storefiles. This is used to remove 
-      // family delete marker during the compaction.
-      if (majorCompaction) {
-        byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
-        if (tmp == null) {
-          // there's a file with no information, must be an old one
-          // assume we have very old puts
-          earliestPutTs = HConstants.OLDEST_TIMESTAMP;
-        } else {
-          earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
-        }
-      }
-    }
-
-    // keep track of compaction progress
-    progress = new CompactionProgress(maxKeyCount);
-
-    // For each file, obtain a scanner:
-    List<StoreFileScanner> scanners = StoreFileScanner
-      .getScannersForStoreFiles(filesToCompact, false, false, true);
-
-    // Make the instantiation lazy in case compaction produces no product; i.e.
-    // where all source cells are expired or deleted.
-    StoreFile.Writer writer = null;
-    // Find the smallest read point across all the Scanners.
-    long smallestReadPoint = region.getSmallestReadPoint();
-    MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
-    try {
-      InternalScanner scanner = null;
-      try {
-        if (getHRegion().getCoprocessorHost() != null) {
-          scanner = getHRegion()
-              .getCoprocessorHost()
-              .preCompactScannerOpen(this, scanners,
-                  majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
-        }
-        if (scanner == null) {
-          Scan scan = new Scan();
-          scan.setMaxVersions(getFamily().getMaxVersions());
-          /* Include deletes, unless we are doing a major compaction */
-          scanner = new StoreScanner(this, getScanInfo(), scan, scanners,
-            majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
-            smallestReadPoint, earliestPutTs);
-        }
-        if (getHRegion().getCoprocessorHost() != null) {
-          InternalScanner cpScanner =
-            getHRegion().getCoprocessorHost().preCompact(this, scanner);
-          // NULL scanner returned from coprocessor hooks means skip normal processing
-          if (cpScanner == null) {
-            return null;
-          }
-          scanner = cpScanner;
-        }
-
-        int bytesWritten = 0;
-        // since scanner.next() can return 'false' but still be delivering data,
-        // we have to use a do/while loop.
-        ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
-        // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
-        boolean hasMore;
-        do {
-          hasMore = scanner.next(kvs, this.compactionKVMax);
-          if (writer == null && !kvs.isEmpty()) {
-            writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
-                true);
-          }
-          if (writer != null) {
-            // output to writer:
-            for (KeyValue kv : kvs) {
-              if (kv.getMemstoreTS() <= smallestReadPoint) {
-                kv.setMemstoreTS(0);
-              }
-              writer.append(kv);
-              // update progress per key
-              ++progress.currentCompactedKVs;
-
-              // check periodically to see if a system stop is requested
-              if (Store.closeCheckInterval > 0) {
-                bytesWritten += kv.getLength();
-                if (bytesWritten > Store.closeCheckInterval) {
-                  bytesWritten = 0;
-                  if (!this.region.areWritesEnabled()) {
-                    writer.close();
-                    fs.delete(writer.getPath(), false);
-                    throw new InterruptedIOException(
-                        "Aborting compaction of store " + this +
-                        " in region " + this.region +
-                        " because user requested stop.");
-                  }
-                }
-              }
-            }
-          }
-          kvs.clear();
-        } while (hasMore);
-      } finally {
-        if (scanner != null) {
-          scanner.close();
-        }
-      }
-    } finally {
-      if (writer != null) {
-        writer.appendMetadata(maxId, majorCompaction);
-        writer.close();
-      }
-    }
-    return writer;
-  }
-
-  /**
    * Validates a store file by opening and closing it. In HFileV2 this should
    * not be an expensive operation.
    *
@@ -1741,7 +1631,7 @@ public class Store extends SchemaConfigu
 
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
-      LOG.error("Failed replacing compacted files in " + this.storeNameStr +
+      LOG.error("Failed replacing compacted files in " + this +
         ". Compacted file is " + (result == null? "none": result.toString()) +
         ".  Files replaced " + compactedFiles.toString() +
         " some of which may have been already removed", e);
@@ -2027,7 +1917,7 @@ public class Store extends SchemaConfigu
         return mk.getRow();
       }
     } catch(IOException e) {
-      LOG.warn("Failed getting store size for " + this.storeNameStr, e);
+      LOG.warn("Failed getting store size for " + this, e);
     } finally {
       this.lock.readLock().unlock();
     }
@@ -2080,7 +1970,7 @@ public class Store extends SchemaConfigu
 
   @Override
   public String toString() {
-    return this.storeNameStr;
+    return getColumnFamilyName();
   }
 
   /**
@@ -2196,7 +2086,7 @@ public class Store extends SchemaConfigu
   }
 
   HRegionInfo getHRegionInfo() {
-    return this.region.regionInfo;
+    return this.region.getRegionInfo();
   }
 
   /**
@@ -2324,8 +2214,8 @@ public class Store extends SchemaConfigu
 
   public static final long FIXED_OVERHEAD =
       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
-          + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
-          + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+          + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+          + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java Thu Dec  6 23:36:29 2012
@@ -49,5 +49,4 @@ public class CompactionProgress {
   public float getProgressPct() {
     return currentCompactedKVs / totalCompactingKVs;
   }
-
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java Thu Dec  6 23:36:29 2012
@@ -25,9 +25,6 @@ import java.util.zip.Checksum;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ChecksumFactory;
-
 /**
  * Checksum types. The Checksum type is a one byte number
  * that stores a representation of the checksum algorithm
@@ -70,7 +67,7 @@ public enum ChecksumType {
         ctor = ChecksumFactory.newConstructor(PURECRC32);
         LOG.info("Checksum using " + PURECRC32);
       } catch (Exception e) {
-        LOG.info(PURECRC32 + " not available.");
+        LOG.trace(PURECRC32 + " not available.");
       }
       try {
         // The default checksum class name is java.util.zip.CRC32. 
@@ -80,7 +77,7 @@ public enum ChecksumType {
           LOG.info("Checksum can use " + JDKCRC);
         }
       } catch (Exception e) {
-        LOG.warn(JDKCRC + " not available. ",  e);
+        LOG.trace(JDKCRC + " not available.");
       }
     }
 
@@ -113,7 +110,7 @@ public enum ChecksumType {
         ctor = ChecksumFactory.newConstructor(PURECRC32C);
         LOG.info("Checksum can use " + PURECRC32C);
       } catch (Exception e) {
-        LOG.info(PURECRC32C + " not available. ");
+        LOG.trace(PURECRC32C + " not available.");
       }
     }
 

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Thu Dec  6 23:36:29 2012
@@ -151,7 +151,7 @@ public abstract class FSUtils {
    */
   public static FSDataOutputStream create(FileSystem fs, Path path,
       FsPermission perm, boolean overwrite) throws IOException {
-    LOG.debug("Creating file:" + path + "with permission:" + perm);
+    LOG.debug("Creating file=" + path + " with permission=" + perm);
 
     return fs.create(path, perm, overwrite,
         fs.getConf().getInt("io.file.buffer.size", 4096),
@@ -1013,6 +1013,25 @@ public abstract class FSUtils {
   }
 
   /**
+   * Given a particular region dir, return all the familydirs inside it
+   *
+   * @param fs A file system for the Path
+   * @param regionDir Path to a specific region directory
+   * @return List of paths to valid family directories in region dir.
+   * @throws IOException
+   */
+  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
+    // assumes we are in a region dir.
+    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
+    List<Path> familyDirs = new ArrayList<Path>(fds.length);
+    for (FileStatus fdfs: fds) {
+      Path fdPath = fdfs.getPath();
+      familyDirs.add(fdPath);
+    }
+    return familyDirs;
+  }
+
+  /**
    * Filter for HFiles that excludes reference files.
    */
   public static class HFileFilter implements PathFilter {

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Thu Dec  6 23:36:29 2012
@@ -587,8 +587,10 @@ public class TestCompaction extends HBas
 
     List<StoreFile> storeFiles = store.getStorefiles();
     long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
+    Compactor tool = new Compactor(this.conf);
 
-    StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId);
+    StoreFile.Writer compactedFile =
+      tool.compact(store, storeFiles, false, maxId);
 
     // Now lets corrupt the compacted file.
     FileSystem fs = FileSystem.get(conf);