You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/12/06 06:59:51 UTC

svn commit: r1417740 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/master/handler/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions...

Author: stack
Date: Thu Dec  6 05:59:46 2012
New Revision: 1417740

URL: http://svn.apache.org/viewvc?rev=1417740&view=rev
Log:
HBASE-7253 Backport Compaction Tool to 0.94; REVERT

Removed:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Thu Dec  6 05:59:46 2012
@@ -645,7 +645,7 @@ public class LruBlockCache implements Bl
     // Log size
     long totalSize = heapSize();
     long freeSize = maxSize - totalSize;
-    LruBlockCache.LOG.debug("Stats: " +
+    LruBlockCache.LOG.debug("LRU Stats: " +
         "total=" + StringUtils.byteDesc(totalSize) + ", " +
         "free=" + StringUtils.byteDesc(freeSize) + ", " +
         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
@@ -653,11 +653,11 @@ public class LruBlockCache implements Bl
         "accesses=" + stats.getRequestCount() + ", " +
         "hits=" + stats.getHitCount() + ", " +
         "hitRatio=" +
-          (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
+          (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) +
         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
         "cachingHits=" + stats.getHitCachingCount() + ", " +
         "cachingHitsRatio=" +
-          (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + ", " +
+          (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) +
         "evictions=" + stats.getEvictionCount() + ", " +
         "evicted=" + stats.getEvictedCount() + ", " +
         "evictedPerRun=" + stats.evictedPerEviction());

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java Thu Dec  6 05:59:46 2012
@@ -142,12 +142,16 @@ public class CreateTableHandler extends 
     List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
     final int batchSize =
       this.conf.getInt("hbase.master.createtable.batchsize", 100);
+    HLog hlog = null;
     for (int regionIdx = 0; regionIdx < this.newRegions.length; regionIdx++) {
       HRegionInfo newRegion = this.newRegions[regionIdx];
       // 1. Create HRegion
       HRegion region = HRegion.createHRegion(newRegion,
         this.fileSystemManager.getRootDir(), this.conf,
-        this.hTableDescriptor, null, false, true);
+        this.hTableDescriptor, hlog);
+      if (hlog == null) {
+        hlog = region.getLog();
+      }
 
       regionInfos.add(region.getRegionInfo());
       if (regionIdx % batchSize == 0) {
@@ -159,6 +163,7 @@ public class CreateTableHandler extends 
       // 3. Close the new region to flush to disk.  Close log file too.
       region.close();
     }
+    hlog.closeAndDelete();
     if (regionInfos.size() > 0) {
       MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos);
     }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Dec  6 05:59:46 2012
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.EOFException;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
@@ -63,7 +62,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -232,12 +230,12 @@ public class HRegion implements HeapSize
    * The directory for the table this region is part of.
    * This directory contains the directory for this region.
    */
-  private final Path tableDir;
+  final Path tableDir;
 
-  private final HLog log;
-  private final FileSystem fs;
-  private final Configuration conf;
-  private final int rowLockWaitDuration;
+  final HLog log;
+  final FileSystem fs;
+  final Configuration conf;
+  final int rowLockWaitDuration;
   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
 
   // The internal wait duration to acquire a lock before read/update
@@ -258,8 +256,8 @@ public class HRegion implements HeapSize
   // purge timeout, when a RPC call will be terminated by the RPC engine.
   final long maxBusyWaitDuration;
 
-  private final HRegionInfo regionInfo;
-  private final Path regiondir;
+  final HRegionInfo regionInfo;
+  final Path regiondir;
   KeyValue.KVComparator comparator;
 
   private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
@@ -726,7 +724,7 @@ public class HRegion implements HeapSize
   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
     if (this.rsAccounting != null) {
       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
-    }
+    }  
     return this.memstoreSize.getAndAdd(memStoreSize);
   }
 
@@ -752,7 +750,7 @@ public class HRegion implements HeapSize
 
     // and then create the file
     Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
-
+    
     // if datanode crashes or if the RS goes down just before the close is called while trying to
     // close the created regioninfo file in the .tmp directory then on next
     // creation we will be getting AlreadyCreatedException.
@@ -760,7 +758,7 @@ public class HRegion implements HeapSize
     if (FSUtils.isExists(fs, tmpPath)) {
       FSUtils.delete(fs, tmpPath, true);
     }
-
+    
     FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms);
 
     try {
@@ -777,26 +775,6 @@ public class HRegion implements HeapSize
     }
   }
 
-  /**
-   * @param fs
-   * @param dir
-   * @return An HRegionInfo instance gotten from the <code>.regioninfo</code> file under region dir
-   * @throws IOException
-   */
-  public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir)
-  throws IOException {
-    Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE);
-    if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString());
-    FSDataInputStream in = fs.open(regioninfo);
-    try {
-      HRegionInfo hri = new HRegionInfo();
-      hri.readFields(in);
-      return hri;
-    } finally {
-      in.close();
-    }
-  }
-
   /** @return a HRegionInfo object for this region */
   public HRegionInfo getRegionInfo() {
     return this.regionInfo;
@@ -1043,16 +1021,19 @@ public class HRegion implements HeapSize
     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
   }
 
-  static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+  private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
       final String threadNamePrefix) {
-    return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
-      new ThreadFactory() {
-        private int count = 1;
-
-        public Thread newThread(Runnable r) {
-          return new Thread(r, threadNamePrefix + "-" + count++);
-        }
-      });
+    ThreadPoolExecutor openAndCloseThreadPool = Threads
+        .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+            new ThreadFactory() {
+              private int count = 1;
+
+              public Thread newThread(Runnable r) {
+                Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+                return t;
+              }
+            });
+    return openAndCloseThreadPool;
   }
 
    /**
@@ -1998,7 +1979,7 @@ public class HRegion implements HeapSize
     System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length);
     return batchMutate(mutationsAndLocks);
   }
-
+  
   /**
    * Perform a batch of mutations.
    * It supports only Put and Delete mutations and will ignore other types passed.
@@ -2352,7 +2333,7 @@ public class HRegion implements HeapSize
 
       // do after lock
       final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs;
-
+            
       // See if the column families were consistent through the whole thing.
       // if they were then keep them. If they were not then pass a null.
       // null will be treated as unknown.
@@ -2655,7 +2636,7 @@ public class HRegion implements HeapSize
     // do after lock
     final long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
-
+    
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -3778,7 +3759,6 @@ public class HRegion implements HeapSize
    * @param conf
    * @param hTableDescriptor
    * @param hlog shared HLog
-   * @param boolean initialize - true to initialize the region
    * @return new HRegion
    *
    * @throws IOException
@@ -3786,36 +3766,7 @@ public class HRegion implements HeapSize
   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
                                       final Configuration conf,
                                       final HTableDescriptor hTableDescriptor,
-                                      final HLog hlog,
-                                      final boolean initialize)
-      throws IOException {
-    return createHRegion(info, rootDir, conf, hTableDescriptor,
-        hlog, initialize, false);
-  }
-
-  /**
-   * Convenience method creating new HRegions. Used by createTable.
-   * The {@link HLog} for the created region needs to be closed
-   * explicitly, if it is not null.
-   * Use {@link HRegion#getLog()} to get access.
-   *
-   * @param info Info for region to create.
-   * @param rootDir Root directory for HBase instance
-   * @param conf
-   * @param hTableDescriptor
-   * @param hlog shared HLog
-   * @param boolean initialize - true to initialize the region
-   * @param boolean ignoreHLog
-      - true to skip generate new hlog if it is null, mostly for createTable
-   * @return new HRegion
-   *
-   * @throws IOException
-   */
-  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
-                                      final Configuration conf,
-                                      final HTableDescriptor hTableDescriptor,
-                                      final HLog hlog,
-                                      final boolean initialize, final boolean ignoreHLog)
+                                      final HLog hlog)
       throws IOException {
     LOG.info("creating HRegion " + info.getTableNameAsString()
         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
@@ -3827,26 +3778,16 @@ public class HRegion implements HeapSize
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(regionDir);
     HLog effectiveHLog = hlog;
-    if (hlog == null && !ignoreHLog) {
+    if (hlog == null) {
       effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
           new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
     }
     HRegion region = HRegion.newHRegion(tableDir,
         effectiveHLog, fs, conf, info, hTableDescriptor, null);
-    if (initialize) {
-      region.initialize();
-    }
+    region.initialize();
     return region;
   }
 
-  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
-                                      final Configuration conf,
-                                      final HTableDescriptor hTableDescriptor,
-                                      final HLog hlog)
-    throws IOException {
-    return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
-  }
-
   /**
    * Open a Region.
    * @param info Info for region to be opened.
@@ -4358,7 +4299,7 @@ public class HRegion implements HeapSize
     // do after lock
     final long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updateGetMetrics(get.familySet(), after - now);
-
+    
     return results;
   }
 
@@ -4686,10 +4627,10 @@ public class HRegion implements HeapSize
       closeRegionOperation();
     }
 
-
+    
     long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
-
+    
     if (flush) {
       // Request a cache flush. Do it outside update lock.
       requestFlush();
@@ -4814,7 +4755,7 @@ public class HRegion implements HeapSize
       long after = EnvironmentEdgeManager.currentTimeMillis();
       this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
     }
-
+    
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -5307,7 +5248,7 @@ public class HRegion implements HeapSize
    */
   private void recordPutWithoutWal(final Map<byte [], List<KeyValue>> familyMap) {
     if (numPutsWithoutWAL.getAndIncrement() == 0) {
-      LOG.info("writing data to region " + this +
+      LOG.info("writing data to region " + this + 
                " with WAL disabled. Data may be lost in the event of a crash.");
     }
 
@@ -5419,11 +5360,11 @@ public class HRegion implements HeapSize
     final HLog log = new HLog(fs, logdir, oldLogDir, c);
     try {
       processTable(fs, tableDir, log, c, majorCompact);
-    } finally {
+     } finally {
        log.close();
        // TODO: is this still right?
        BlockCache bc = new CacheConfig(c).getBlockCache();
        if (bc != null) bc.shutdown();
-    }
+     }
   }
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Dec  6 05:59:46 2012
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -131,6 +132,9 @@ public class Store extends SchemaConfigu
   private volatile long totalUncompressedBytes = 0L;
   private final Object flushLock = new Object();
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final String storeNameStr;
+  private CompactionProgress progress;
+  private final int compactionKVMax;
   private final boolean verifyBulkLoads;
 
   /* The default priority for user-specified compaction requests.
@@ -154,6 +158,10 @@ public class Store extends SchemaConfigu
     new CopyOnWriteArraySet<ChangedReadersObserver>();
 
   private final int blocksize;
+  /** Compression algorithm for flush files and minor compaction */
+  private final Compression.Algorithm compression;
+  /** Compression algorithm for major compaction */
+  private final Compression.Algorithm compactionCompression;
   private HFileDataBlockEncoder dataBlockEncoder;
 
   /** Checksum configuration */
@@ -163,8 +171,6 @@ public class Store extends SchemaConfigu
   // Comparing KeyValues
   final KeyValue.KVComparator comparator;
 
-  private final Compactor compactor;
-
   /**
    * Constructor
    * @param basedir qualified path under which the region directory lives;
@@ -179,16 +185,25 @@ public class Store extends SchemaConfigu
   protected Store(Path basedir, HRegion region, HColumnDescriptor family,
     FileSystem fs, Configuration conf)
   throws IOException {
-    super(conf, region.getRegionInfo().getTableNameAsString(),
+    super(conf, region.getTableDesc().getNameAsString(),
         Bytes.toString(family.getName()));
-    HRegionInfo info = region.getRegionInfo();
+    HRegionInfo info = region.regionInfo;
     this.fs = fs;
-    Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
-    this.homedir = createStoreHomeDir(this.fs, p);
+    this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
+    if (!this.fs.exists(this.homedir)) {
+      if (!this.fs.mkdirs(this.homedir))
+        throw new IOException("Failed create of: " + this.homedir.toString());
+    }
     this.region = region;
     this.family = family;
     this.conf = conf;
     this.blocksize = family.getBlocksize();
+    this.compression = family.getCompression();
+    // avoid overriding compression setting for major compactions if the user
+    // has not specified it separately
+    this.compactionCompression =
+      (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
+        family.getCompactionCompression() : this.compression;
 
     this.dataBlockEncoder =
         new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
@@ -213,6 +228,7 @@ public class Store extends SchemaConfigu
         "ms in store " + this);
     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
     this.memstore = new MemStore(conf, this.comparator);
+    this.storeNameStr = getColumnFamilyName();
 
     // By default, compact if storefile.count >= minFilesToCompact
     this.minFilesToCompact = Math.max(2,
@@ -229,8 +245,10 @@ public class Store extends SchemaConfigu
       this.region.memstoreFlushSize);
     this.maxCompactSize
       = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
+    this.compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
 
-    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
+    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify",
+        false);
 
     if (Store.closeCheckInterval == 0) {
       Store.closeCheckInterval = conf.getInt(
@@ -242,47 +260,6 @@ public class Store extends SchemaConfigu
     this.checksumType = getChecksumType(conf);
     // initilize bytes per checksum
     this.bytesPerChecksum = getBytesPerChecksum(conf);
-    // Create a compaction tool instance
-    this.compactor = new Compactor(this.conf);
-  }
-
-  /**
-   * @param family
-   * @return
-   */
-  long getTTL(final HColumnDescriptor family) {
-    // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
-    long ttl = family.getTimeToLive();
-    if (ttl == HConstants.FOREVER) {
-      // Default is unlimited ttl.
-      ttl = Long.MAX_VALUE;
-    } else if (ttl == -1) {
-      ttl = Long.MAX_VALUE;
-    } else {
-      // Second -> ms adjust for user data
-      ttl *= 1000;
-    }
-    return ttl;
-  }
-
-  /**
-   * Create this store's homedir
-   * @param fs
-   * @param homedir
-   * @return Return <code>homedir</code>
-   * @throws IOException
-   */
-  Path createStoreHomeDir(final FileSystem fs,
-      final Path homedir) throws IOException {
-    if (!fs.exists(homedir)) {
-      if (!fs.mkdirs(homedir))
-        throw new IOException("Failed create of: " + homedir.toString());
-    }
-    return homedir;
-  }
-
-  FileSystem getFileSystem() {
-    return this.fs;
   }
 
   /**
@@ -343,7 +320,7 @@ public class Store extends SchemaConfigu
    * Return the directory in which this store stores its
    * StoreFiles
    */
-  Path getHomedir() {
+  public Path getHomedir() {
     return homedir;
   }
 
@@ -362,10 +339,6 @@ public class Store extends SchemaConfigu
     this.dataBlockEncoder = blockEncoder;
   }
 
-  FileStatus [] getStoreFiles() throws IOException {
-    return FSUtils.listStatus(this.fs, this.homedir, null);
-  }
-
   /**
    * Creates an unsorted list of StoreFile loaded in parallel
    * from the given directory.
@@ -373,7 +346,7 @@ public class Store extends SchemaConfigu
    */
   private List<StoreFile> loadStoreFiles() throws IOException {
     ArrayList<StoreFile> results = new ArrayList<StoreFile>();
-    FileStatus files[] = getStoreFiles();
+    FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
 
     if (files == null || files.length == 0) {
       return results;
@@ -664,7 +637,7 @@ public class Store extends SchemaConfigu
           storeFileCloserThreadPool.shutdownNow();
         }
       }
-      LOG.info("Closed " + this);
+      LOG.debug("closed " + this.storeNameStr);
       return result;
     } finally {
       this.lock.writeLock().unlock();
@@ -750,7 +723,6 @@ public class Store extends SchemaConfigu
       scanner = cpScanner;
     }
     try {
-      int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
       // TODO:  We can fail in the below block before we complete adding this
       // flush to list of store files.  Add cleanup of anything put on filesystem
       // if we fail.
@@ -764,7 +736,7 @@ public class Store extends SchemaConfigu
           List<KeyValue> kvs = new ArrayList<KeyValue>();
           boolean hasMore;
           do {
-            hasMore = scanner.next(kvs, compactionKVMax);
+            hasMore = scanner.next(kvs, this.compactionKVMax);
             if (!kvs.isEmpty()) {
               for (KeyValue kv : kvs) {
                 // If we know that this KV is going to be included always, then let us
@@ -856,7 +828,7 @@ public class Store extends SchemaConfigu
    */
   private StoreFile.Writer createWriterInTmp(int maxKeyCount)
   throws IOException {
-    return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
+    return createWriterInTmp(maxKeyCount, this.compression, false);
   }
 
   /*
@@ -1009,12 +981,16 @@ public class Store extends SchemaConfigu
    * @param cr
    *          compaction details obtained from requestCompaction()
    * @throws IOException
-   * @return Storefile we compacted into or null if we failed or opted out early.
    */
-  StoreFile compact(CompactionRequest cr) throws IOException {
-    if (cr == null || cr.getFiles().isEmpty()) return null;
-    Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
+  void compact(CompactionRequest cr) throws IOException {
+    if (cr == null || cr.getFiles().isEmpty()) {
+      return;
+    }
+    Preconditions.checkArgument(cr.getStore().toString()
+        .equals(this.toString()));
+
     List<StoreFile> filesToCompact = cr.getFiles();
+
     synchronized (filesCompacting) {
       // sanity check: we're compacting files that this store knows about
       // TODO: change this to LOG.error() after more debugging
@@ -1026,26 +1002,19 @@ public class Store extends SchemaConfigu
 
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
-        + this + " of "
+        + this.storeNameStr + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
         + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
         + StringUtils.humanReadableInt(cr.getSize()));
 
     StoreFile sf = null;
     try {
-      StoreFile.Writer writer =
-        this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
+      StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
+          maxId);
       // Move the compaction into place.
-      if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
-        sf = completeCompaction(filesToCompact, writer);
-        if (region.getCoprocessorHost() != null) {
-          region.getCoprocessorHost().postCompact(this, sf);
-        }
-      } else {
-        // Create storefile around what we wrote with a reader on it.
-        sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
-          this.family.getBloomFilterType(), this.dataBlockEncoder);
-        sf.createReader();
+      sf = completeCompaction(filesToCompact, writer);
+      if (region.getCoprocessorHost() != null) {
+        region.getCoprocessorHost().postCompact(this, sf);
       }
     } finally {
       synchronized (filesCompacting) {
@@ -1054,7 +1023,7 @@ public class Store extends SchemaConfigu
     }
 
     LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
-        + filesToCompact.size() + " file(s) in " + this + " of "
+        + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
         + " into " +
         (sf == null ? "none" : sf.getPath().getName()) +
@@ -1062,7 +1031,6 @@ public class Store extends SchemaConfigu
           StringUtils.humanReadableInt(sf.getReader().length()))
         + "; total size for store is "
         + StringUtils.humanReadableInt(storeSize));
-    return sf;
   }
 
   /**
@@ -1102,8 +1070,7 @@ public class Store extends SchemaConfigu
 
     try {
       // Ready to go. Have list of files to compact.
-      StoreFile.Writer writer =
-        this.compactor.compact(this, filesToCompact, isMajor, maxId);
+      StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
       if (region.getCoprocessorHost() != null) {
@@ -1152,10 +1119,10 @@ public class Store extends SchemaConfigu
   }
 
   /** getter for CompactionProgress object
-   * @return CompactionProgress object; can be null
+   * @return CompactionProgress object
    */
   public CompactionProgress getCompactionProgress() {
-    return this.compactor.getProgress();
+    return this.progress;
   }
 
   /*
@@ -1207,19 +1174,19 @@ public class Store extends SchemaConfigu
         if (sf.isMajorCompaction() &&
             (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping major compaction of " + this +
+            LOG.debug("Skipping major compaction of " + this.storeNameStr +
                 " because one (major) compacted file only and oldestTime " +
                 oldest + "ms is < ttl=" + this.ttl);
           }
         } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
-          LOG.debug("Major compaction triggered on store " + this +
+          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
             ", because keyvalues outdated; time since last major compaction " +
             (now - lowTimestamp) + "ms");
           result = true;
         }
       } else {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Major compaction triggered on store " + this +
+          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
               "; time since last major compaction " + (now - lowTimestamp) + "ms");
         }
         result = true;
@@ -1409,12 +1376,12 @@ public class Store extends SchemaConfigu
              compactSelection.getFilesToCompact().get(pos).getReader().length()
                > maxCompactSize &&
              !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
-      if (pos != 0) compactSelection.clearSubList(0, pos);
+      compactSelection.clearSubList(0, pos);
     }
 
     if (compactSelection.getFilesToCompact().isEmpty()) {
       LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
-        this + ": no store files to compact");
+        this.storeNameStr + ": no store files to compact");
       compactSelection.emptyFileList();
       return compactSelection;
     }
@@ -1501,7 +1468,7 @@ public class Store extends SchemaConfigu
       // if we don't have enough files to compact, just wait
       if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipped compaction of " + this
+          LOG.debug("Skipped compaction of " + this.storeNameStr
             + ".  Only " + (end - start) + " file(s) of size "
             + StringUtils.humanReadableInt(totalSize)
             + " have met compaction criteria.");
@@ -1528,6 +1495,149 @@ public class Store extends SchemaConfigu
   }
 
   /**
+   * Do a minor/major compaction on an explicit set of storefiles in a Store.
+   * Uses the scan infrastructure to make it easy.
+   *
+   * @param filesToCompact which files to compact
+   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+   * @param maxId Readers maximum sequence id.
+   * @return Product of compaction or null if all cells expired or deleted and
+   * nothing made it through the compaction.
+   * @throws IOException
+   */
+  StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
+                               final boolean majorCompaction, final long maxId)
+      throws IOException {
+    // calculate maximum key count after compaction (for blooms)
+    int maxKeyCount = 0;
+    long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+    for (StoreFile file : filesToCompact) {
+      StoreFile.Reader r = file.getReader();
+      if (r != null) {
+        // NOTE: getFilterEntries could cause under-sized blooms if the user
+        //       switches bloom type (e.g. from ROW to ROWCOL)
+        long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
+          ? r.getFilterEntries() : r.getEntries();
+        maxKeyCount += keyCount;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Compacting " + file +
+            ", keycount=" + keyCount +
+            ", bloomtype=" + r.getBloomFilterType().toString() +
+            ", size=" + StringUtils.humanReadableInt(r.length()) +
+            ", encoding=" + r.getHFileReader().getEncodingOnDisk());
+        }
+      }
+      // For major compactions calculate the earliest put timestamp
+      // of all involved storefiles. This is used to remove 
+      // family delete marker during the compaction.
+      if (majorCompaction) {
+        byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+        if (tmp == null) {
+          // there's a file with no information, must be an old one
+          // assume we have very old puts
+          earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+        } else {
+          earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+        }
+      }
+    }
+
+    // keep track of compaction progress
+    progress = new CompactionProgress(maxKeyCount);
+
+    // For each file, obtain a scanner:
+    List<StoreFileScanner> scanners = StoreFileScanner
+      .getScannersForStoreFiles(filesToCompact, false, false, true);
+
+    // Make the instantiation lazy in case compaction produces no product; i.e.
+    // where all source cells are expired or deleted.
+    StoreFile.Writer writer = null;
+    // Find the smallest read point across all the Scanners.
+    long smallestReadPoint = region.getSmallestReadPoint();
+    MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+    try {
+      InternalScanner scanner = null;
+      try {
+        if (getHRegion().getCoprocessorHost() != null) {
+          scanner = getHRegion()
+              .getCoprocessorHost()
+              .preCompactScannerOpen(this, scanners,
+                  majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
+        }
+        if (scanner == null) {
+          Scan scan = new Scan();
+          scan.setMaxVersions(getFamily().getMaxVersions());
+          /* Include deletes, unless we are doing a major compaction */
+          scanner = new StoreScanner(this, getScanInfo(), scan, scanners,
+            majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+            smallestReadPoint, earliestPutTs);
+        }
+        if (getHRegion().getCoprocessorHost() != null) {
+          InternalScanner cpScanner =
+            getHRegion().getCoprocessorHost().preCompact(this, scanner);
+          // NULL scanner returned from coprocessor hooks means skip normal processing
+          if (cpScanner == null) {
+            return null;
+          }
+          scanner = cpScanner;
+        }
+
+        int bytesWritten = 0;
+        // since scanner.next() can return 'false' but still be delivering data,
+        // we have to use a do/while loop.
+        ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
+        // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+        boolean hasMore;
+        do {
+          hasMore = scanner.next(kvs, this.compactionKVMax);
+          if (writer == null && !kvs.isEmpty()) {
+            writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
+                true);
+          }
+          if (writer != null) {
+            // output to writer:
+            for (KeyValue kv : kvs) {
+              if (kv.getMemstoreTS() <= smallestReadPoint) {
+                kv.setMemstoreTS(0);
+              }
+              writer.append(kv);
+              // update progress per key
+              ++progress.currentCompactedKVs;
+
+              // check periodically to see if a system stop is requested
+              if (Store.closeCheckInterval > 0) {
+                bytesWritten += kv.getLength();
+                if (bytesWritten > Store.closeCheckInterval) {
+                  bytesWritten = 0;
+                  if (!this.region.areWritesEnabled()) {
+                    writer.close();
+                    fs.delete(writer.getPath(), false);
+                    throw new InterruptedIOException(
+                        "Aborting compaction of store " + this +
+                        " in region " + this.region +
+                        " because user requested stop.");
+                  }
+                }
+              }
+            }
+          }
+          kvs.clear();
+        } while (hasMore);
+      } finally {
+        if (scanner != null) {
+          scanner.close();
+        }
+      }
+    } finally {
+      if (writer != null) {
+        writer.appendMetadata(maxId, majorCompaction);
+        writer.close();
+      }
+    }
+    return writer;
+  }
+
+  /**
    * Validates a store file by opening and closing it. In HFileV2 this should
    * not be an expensive operation.
    *
@@ -1631,7 +1741,7 @@ public class Store extends SchemaConfigu
 
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
-      LOG.error("Failed replacing compacted files in " + this +
+      LOG.error("Failed replacing compacted files in " + this.storeNameStr +
         ". Compacted file is " + (result == null? "none": result.toString()) +
         ".  Files replaced " + compactedFiles.toString() +
         " some of which may have been already removed", e);
@@ -1917,7 +2027,7 @@ public class Store extends SchemaConfigu
         return mk.getRow();
       }
     } catch(IOException e) {
-      LOG.warn("Failed getting store size for " + this, e);
+      LOG.warn("Failed getting store size for " + this.storeNameStr, e);
     } finally {
       this.lock.readLock().unlock();
     }
@@ -1970,7 +2080,7 @@ public class Store extends SchemaConfigu
 
   @Override
   public String toString() {
-    return getColumnFamilyName();
+    return this.storeNameStr;
   }
 
   /**
@@ -2086,7 +2196,7 @@ public class Store extends SchemaConfigu
   }
 
   HRegionInfo getHRegionInfo() {
-    return this.region.getRegionInfo();
+    return this.region.regionInfo;
   }
 
   /**
@@ -2214,8 +2324,8 @@ public class Store extends SchemaConfigu
 
   public static final long FIXED_OVERHEAD =
       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
-          + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
-          + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+          + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+          + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java Thu Dec  6 05:59:46 2012
@@ -49,4 +49,5 @@ public class CompactionProgress {
   public float getProgressPct() {
     return currentCompactedKVs / totalCompactingKVs;
   }
+
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java Thu Dec  6 05:59:46 2012
@@ -25,6 +25,9 @@ import java.util.zip.Checksum;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.ChecksumFactory;
+
 /**
  * Checksum types. The Checksum type is a one byte number
  * that stores a representation of the checksum algorithm
@@ -67,7 +70,7 @@ public enum ChecksumType {
         ctor = ChecksumFactory.newConstructor(PURECRC32);
         LOG.info("Checksum using " + PURECRC32);
       } catch (Exception e) {
-        LOG.trace(PURECRC32 + " not available.");
+        LOG.info(PURECRC32 + " not available.");
       }
       try {
         // The default checksum class name is java.util.zip.CRC32. 
@@ -77,7 +80,7 @@ public enum ChecksumType {
           LOG.info("Checksum can use " + JDKCRC);
         }
       } catch (Exception e) {
-        LOG.trace(JDKCRC + " not available.");
+        LOG.warn(JDKCRC + " not available. ",  e);
       }
     }
 
@@ -110,7 +113,7 @@ public enum ChecksumType {
         ctor = ChecksumFactory.newConstructor(PURECRC32C);
         LOG.info("Checksum can use " + PURECRC32C);
       } catch (Exception e) {
-        LOG.trace(PURECRC32C + " not available.");
+        LOG.info(PURECRC32C + " not available. ");
       }
     }
 

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Thu Dec  6 05:59:46 2012
@@ -151,7 +151,7 @@ public abstract class FSUtils {
    */
   public static FSDataOutputStream create(FileSystem fs, Path path,
       FsPermission perm, boolean overwrite) throws IOException {
-    LOG.debug("Creating file=" + path + " with permission=" + perm);
+    LOG.debug("Creating file:" + path + "with permission:" + perm);
 
     return fs.create(path, perm, overwrite,
         fs.getConf().getInt("io.file.buffer.size", 4096),
@@ -1013,25 +1013,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Given a particular region dir, return all the familydirs inside it
-   *
-   * @param fs A file system for the Path
-   * @param regionDir Path to a specific region directory
-   * @return List of paths to valid family directories in region dir.
-   * @throws IOException
-   */
-  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
-    // assumes we are in a region dir.
-    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
-    List<Path> familyDirs = new ArrayList<Path>(fds.length);
-    for (FileStatus fdfs: fds) {
-      Path fdPath = fdfs.getPath();
-      familyDirs.add(fdPath);
-    }
-    return familyDirs;
-  }
-
-  /**
    * Filter for HFiles that excludes reference files.
    */
   public static class HFileFilter implements PathFilter {

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Thu Dec  6 05:59:46 2012
@@ -587,10 +587,8 @@ public class TestCompaction extends HBas
 
     List<StoreFile> storeFiles = store.getStorefiles();
     long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
-    Compactor tool = new Compactor(this.conf);
 
-    StoreFile.Writer compactedFile =
-      tool.compact(store, storeFiles, false, maxId);
+    StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId);
 
     // Now lets corrupt the compacted file.
     FileSystem fs = FileSystem.get(conf);