You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2015/05/28 10:13:47 UTC

[6/6] hbase git commit: HBASE-13763 Handle the rename, annotation and typo stuff in MOB. (Jingcheng)

HBASE-13763 Handle the rename, annotation and typo stuff in MOB. (Jingcheng)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b31a6acf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b31a6acf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b31a6acf

Branch: refs/heads/hbase-11339
Commit: b31a6acf4c24d55ac74eaf4d669a33078e27ef89
Parents: 6388b3b
Author: anoopsjohn <an...@gmail.com>
Authored: Thu May 28 13:43:12 2015 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Thu May 28 13:43:12 2015 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Admin.java   |   4 +-
 .../apache/hadoop/hbase/client/HBaseAdmin.java  |   4 +-
 .../src/main/resources/hbase-default.xml        |  30 +-
 .../regionserver/MetricsRegionServerSource.java |  16 +-
 .../MetricsRegionServerWrapper.java             |  16 +-
 .../MetricsRegionServerSourceImpl.java          |  16 +-
 .../hbase/chaos/actions/CompactMobAction.java   |   4 +-
 .../org/apache/hadoop/hbase/io/HFileLink.java   |   4 +-
 .../master/ExpiredMobFileCleanerChore.java      |   2 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |  61 +-
 .../hbase/master/MasterMobCompactionThread.java | 184 ++++
 .../master/MasterMobFileCompactionThread.java   | 184 ----
 .../hadoop/hbase/master/MasterRpcServices.java  |  18 +-
 .../hadoop/hbase/master/MobCompactionChore.java |  97 ++
 .../hbase/master/MobFileCompactionChore.java    |  97 --
 .../master/handler/DeleteTableHandler.java      |  14 +-
 .../master/procedure/DeleteTableProcedure.java  |   9 +-
 .../hadoop/hbase/mob/DefaultMobCompactor.java   | 304 ------
 .../hbase/mob/DefaultMobStoreCompactor.java     | 305 ++++++
 .../hbase/mob/DefaultMobStoreFlusher.java       |   4 +-
 .../hadoop/hbase/mob/ExpiredMobFileCleaner.java |   2 +-
 .../apache/hadoop/hbase/mob/MobConstants.java   |  40 +-
 .../apache/hadoop/hbase/mob/MobFileCache.java   |   4 +-
 .../apache/hadoop/hbase/mob/MobFileName.java    |   4 +-
 .../apache/hadoop/hbase/mob/MobStoreEngine.java |   2 +-
 .../org/apache/hadoop/hbase/mob/MobUtils.java   |  62 +-
 .../mob/compactions/MobCompactionRequest.java   |  64 ++
 .../hbase/mob/compactions/MobCompactor.java     |  90 ++
 .../PartitionedMobCompactionRequest.java        | 146 +++
 .../compactions/PartitionedMobCompactor.java    | 655 +++++++++++++
 .../MobFileCompactionRequest.java               |  64 --
 .../mob/filecompactions/MobFileCompactor.java   |  90 --
 .../PartitionedMobFileCompactionRequest.java    | 146 ---
 .../PartitionedMobFileCompactor.java            | 636 -------------
 .../hbase/mob/mapreduce/MemStoreWrapper.java    |  17 +-
 .../hadoop/hbase/mob/mapreduce/SweepJob.java    |  22 +-
 .../hbase/mob/mapreduce/SweepReducer.java       |  81 +-
 .../hadoop/hbase/mob/mapreduce/Sweeper.java     |   4 +-
 .../hadoop/hbase/regionserver/HMobStore.java    |  40 +-
 .../MetricsRegionServerWrapperImpl.java         |  48 +-
 .../regionserver/ReversedMobStoreScanner.java   |   2 +-
 .../hbase/regionserver/StoreFileInfo.java       |   2 +-
 .../hadoop/hbase/snapshot/SnapshotManifest.java |   8 +-
 .../hbase/mob/compactions/TestMobCompactor.java | 924 +++++++++++++++++++
 .../TestPartitionedMobCompactionRequest.java    |  60 ++
 .../TestPartitionedMobCompactor.java            | 440 +++++++++
 .../filecompactions/TestMobFileCompactor.java   | 922 ------------------
 ...TestPartitionedMobFileCompactionRequest.java |  60 --
 .../TestPartitionedMobFileCompactor.java        | 436 ---------
 .../MetricsRegionServerWrapperStub.java         |   8 +-
 .../hbase/regionserver/TestMobCompaction.java   | 460 ---------
 .../regionserver/TestMobStoreCompaction.java    | 460 +++++++++
 hbase-shell/src/main/ruby/hbase/admin.rb        |   4 +-
 53 files changed, 3692 insertions(+), 3684 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 3c0383f..1e50525 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -1461,7 +1461,7 @@ public interface Admin extends Abortable, Closeable {
    * @throws IOException
    * @throws InterruptedException
    */
-  void compactMob(final TableName tableName) throws IOException,
+  void compactMobs(final TableName tableName) throws IOException,
     InterruptedException;
 
   /**
@@ -1482,7 +1482,7 @@ public interface Admin extends Abortable, Closeable {
    * @throws IOException
    * @throws InterruptedException
    */
-  void majorCompactMob(final TableName tableName) throws IOException,
+  void majorCompactMobs(final TableName tableName) throws IOException,
     InterruptedException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index caa12d2..4461e5c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -4052,7 +4052,7 @@ public class HBaseAdmin implements Admin {
    * {@inheritDoc}
    */
   @Override
-  public void compactMob(final TableName tableName) throws IOException, InterruptedException {
+  public void compactMobs(final TableName tableName) throws IOException, InterruptedException {
     checkTableNameNotNull(tableName);
     compactMob(tableName, null, false);
   }
@@ -4073,7 +4073,7 @@ public class HBaseAdmin implements Admin {
    * {@inheritDoc}
    */
   @Override
-  public void majorCompactMob(final TableName tableName) throws IOException, InterruptedException {
+  public void majorCompactMobs(final TableName tableName) throws IOException, InterruptedException {
     checkTableNameNotNull(tableName);
     compactMob(tableName, null, true);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 540dded..66f5e73 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1605,54 +1605,54 @@ possible configurations would overwhelm and obscure the important.
     </description>
   </property>
   <property>
-    <name>hbase.mob.file.compaction.mergeable.threshold</name>
+    <name>hbase.mob.compaction.mergeable.threshold</name>
     <value>201326592</value>
     <description>
       If the size of a mob file is less than this value, it's regarded as a small
-      file and needs to be merged in mob file compaction. The default value is 192MB.
+      file and needs to be merged in mob compaction. The default value is 192MB.
     </description>
   </property>
   <property>
     <name>hbase.mob.delfile.max.count</name>
     <value>3</value>
     <description>
-      The max number of del files that is allowed in the mob file compaction.
-      In the mob file compaction, when the number of existing del files is larger than
+      The max number of del files that is allowed in the mob compaction.
+      In the mob compaction, when the number of existing del files is larger than
       this value, they are merged until number of del files is not larger this value.
       The default value is 3.
     </description>
   </property>
   <property>
-    <name>hbase.mob.file.compaction.batch.size</name>
+    <name>hbase.mob.compaction.batch.size</name>
     <value>100</value>
     <description>
-      The max number of the mob files that is allowed in a batch of the mob file compaction.
-      The mob file compaction merges the small mob files to bigger ones. If the number of the
+      The max number of the mob files that is allowed in a batch of the mob compaction.
+      The mob compaction merges the small mob files to bigger ones. If the number of the
       small files is very large, it could lead to a "too many opened file handlers" in the merge.
       And the merge has to be split into batches. This value limits the number of mob files
-      that are selected in a batch of the mob file compaction. The default value is 100.
+      that are selected in a batch of the mob compaction. The default value is 100.
     </description>
   </property>
   <property>
-    <name>hbase.mob.file.compaction.chore.period</name>
+    <name>hbase.mob.compaction.chore.period</name>
     <value>604800</value>
     <description>
-      The period that MobFileCompactionChore runs. The unit is second.
+      The period that MobCompactionChore runs. The unit is second.
       The default value is one week.
     </description>
   </property>
   <property>
-    <name>hbase.mob.file.compactor.class</name>
-    <value>org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor</value>
+    <name>hbase.mob.compactor.class</name>
+    <value>org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor</value>
     <description>
-      Implementation of mob file compactor, the default one is PartitionedMobFileCompactor.
+      Implementation of mob compactor, the default one is PartitionedMobCompactor.
     </description>
   </property>
   <property>
-    <name>hbase.mob.file.compaction.threads.max</name>
+    <name>hbase.mob.compaction.threads.max</name>
     <value>1</value>
     <description>
-      The max number of threads used in MobFileCompactor.
+      The max number of threads used in MobCompactor.
     </description>
   </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index 268d4af..80b5cd2 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -267,17 +267,17 @@ public interface MetricsRegionServerSource extends BaseSource {
   String MAJOR_COMPACTED_CELLS_SIZE = "majorCompactedCellsSize";
   String MAJOR_COMPACTED_CELLS_SIZE_DESC =
       "The total amount of data processed during major compactions, in bytes";
-  String MOB_COMPACTED_INTO_MOB_CELLS_COUNT = "mobCompactedIntoMobCellsCount";
-  String MOB_COMPACTED_INTO_MOB_CELLS_COUNT_DESC =
+  String CELLS_COUNT_COMPACTED_TO_MOB = "cellsCountCompactedToMob";
+  String CELLS_COUNT_COMPACTED_TO_MOB_DESC =
       "The number of cells moved to mob during compaction";
-  String MOB_COMPACTED_FROM_MOB_CELLS_COUNT = "mobCompactedFromMobCellsCount";
-  String MOB_COMPACTED_FROM_MOB_CELLS_COUNT_DESC =
+  String CELLS_COUNT_COMPACTED_FROM_MOB = "cellsCountCompactedFromMob";
+  String CELLS_COUNT_COMPACTED_FROM_MOB_DESC =
       "The number of cells moved from mob during compaction";
-  String MOB_COMPACTED_INTO_MOB_CELLS_SIZE = "mobCompactedIntoMobCellsSize";
-  String MOB_COMPACTED_INTO_MOB_CELLS_SIZE_DESC =
+  String CELLS_SIZE_COMPACTED_TO_MOB = "cellsSizeCompactedToMob";
+  String CELLS_SIZE_COMPACTED_TO_MOB_DESC =
       "The total amount of cells move to mob during compaction, in bytes";
-  String MOB_COMPACTED_FROM_MOB_CELLS_SIZE = "mobCompactedFromMobCellsSize";
-  String MOB_COMPACTED_FROM_MOB_CELLS_SIZE_DESC =
+  String CELLS_SIZE_COMPACTED_FROM_MOB = "cellsSizeCompactedFromMob";
+  String CELLS_SIZE_COMPACTED_FROM_MOB_DESC =
       "The total amount of cells move from mob during compaction, in bytes";
   String MOB_FLUSH_COUNT = "mobFlushCount";
   String MOB_FLUSH_COUNT_DESC = "The number of the flushes in mob-enabled stores";

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
index b609b4a..f2bd8ff 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
@@ -258,24 +258,24 @@ public interface MetricsRegionServerWrapper {
   long getMajorCompactedCellsSize();
 
   /**
-   * Gets the number of cells move to mob during compaction.
+   * Gets the number of cells moved to mob during compaction.
    */
-  long getMobCompactedIntoMobCellsCount();
+  long getCellsCountCompactedToMob();
 
   /**
-   * Gets the number of cells move from mob during compaction.
+   * Gets the number of cells moved from mob during compaction.
    */
-  long getMobCompactedFromMobCellsCount();
+  long getCellsCountCompactedFromMob();
 
   /**
-   * Gets the total amount of cells move to mob during compaction, in bytes.
+   * Gets the total amount of cells moved to mob during compaction, in bytes.
    */
-  long getMobCompactedIntoMobCellsSize();
+  long getCellsSizeCompactedToMob();
 
   /**
-   * Gets the total amount of cells move from mob during compaction, in bytes.
+   * Gets the total amount of cells moved from mob during compaction, in bytes.
    */
-  long getMobCompactedFromMobCellsSize();
+  long getCellsSizeCompactedFromMob();
 
   /**
    * Gets the number of the flushes in mob-enabled stores.

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index cadb574..26c55bb 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -259,14 +259,14 @@ public class MetricsRegionServerSourceImpl
           .addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC),
               rsWrap.getMajorCompactedCellsSize())
 
-          .addCounter(Interns.info(MOB_COMPACTED_FROM_MOB_CELLS_COUNT, MOB_COMPACTED_FROM_MOB_CELLS_COUNT_DESC),
-              rsWrap.getMobCompactedFromMobCellsCount())
-          .addCounter(Interns.info(MOB_COMPACTED_INTO_MOB_CELLS_COUNT, MOB_COMPACTED_INTO_MOB_CELLS_COUNT_DESC),
-              rsWrap.getMobCompactedIntoMobCellsCount())
-          .addCounter(Interns.info(MOB_COMPACTED_FROM_MOB_CELLS_SIZE, MOB_COMPACTED_FROM_MOB_CELLS_SIZE_DESC),
-              rsWrap.getMobCompactedFromMobCellsSize())
-          .addCounter(Interns.info(MOB_COMPACTED_INTO_MOB_CELLS_SIZE, MOB_COMPACTED_INTO_MOB_CELLS_SIZE_DESC),
-              rsWrap.getMobCompactedIntoMobCellsSize())
+          .addCounter(Interns.info(CELLS_COUNT_COMPACTED_FROM_MOB, CELLS_COUNT_COMPACTED_FROM_MOB_DESC),
+              rsWrap.getCellsCountCompactedFromMob())
+          .addCounter(Interns.info(CELLS_COUNT_COMPACTED_TO_MOB, CELLS_COUNT_COMPACTED_TO_MOB_DESC),
+              rsWrap.getCellsCountCompactedToMob())
+          .addCounter(Interns.info(CELLS_SIZE_COMPACTED_FROM_MOB, CELLS_SIZE_COMPACTED_FROM_MOB_DESC),
+              rsWrap.getCellsSizeCompactedFromMob())
+          .addCounter(Interns.info(CELLS_SIZE_COMPACTED_TO_MOB, CELLS_SIZE_COMPACTED_TO_MOB_DESC),
+              rsWrap.getCellsSizeCompactedToMob())
           .addCounter(Interns.info(MOB_FLUSH_COUNT, MOB_FLUSH_COUNT_DESC),
               rsWrap.getMobFlushCount())
           .addCounter(Interns.info(MOB_FLUSHED_CELLS_COUNT, MOB_FLUSHED_CELLS_COUNT_DESC),

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/CompactMobAction.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/CompactMobAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/CompactMobAction.java
index a349d3d..87c6dee 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/CompactMobAction.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/CompactMobAction.java
@@ -51,9 +51,9 @@ public class CompactMobAction extends Action {
     LOG.info("Performing action: Compact mob of table " + tableName + ", major=" + major);
     try {
       if (major) {
-        admin.majorCompactMob(tableName);
+        admin.majorCompactMobs(tableName);
       } else {
-        admin.compactMob(tableName);
+        admin.compactMobs(tableName);
       }
     } catch (Exception ex) {
       LOG.warn("Mob Compaction failed, might be caused by other chaos: " + ex.getMessage());

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
index a950dce..d070539 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
@@ -184,7 +184,9 @@ public class HFileLink extends FileLink {
   /**
    * @return the path of the mob hfiles.
    */
-  public Path getMobPath() { return this.mobPath; }
+  public Path getMobPath() {
+    return this.mobPath;
+  }
 
     /**
    * @param path Path to check.

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
index 7b06462..7ca3362 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@ -92,7 +92,7 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore {
                   lock.release();
                 } catch (IOException e) {
                   LOG.error(
-                    "Fail to release the write lock for the table " + htd.getNameAsString(), e);
+                    "Fail to release the read lock for the table " + htd.getNameAsString(), e);
                 }
               }
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index b3fc2a1..fc9aac7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -110,7 +110,6 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
@@ -280,13 +279,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
-  private MobFileCompactionChore mobFileCompactChore;
-  MasterMobFileCompactionThread mobFileCompactThread;
-  // used to synchronize the mobFileCompactionStates
-  private final IdLock mobFileCompactionLock = new IdLock();
-  // save the information of mob file compactions in tables.
+  private MobCompactionChore mobCompactChore;
+  private MasterMobCompactionThread mobCompactThread;
+  // used to synchronize the mobCompactionStates
+  private final IdLock mobCompactionLock = new IdLock();
+  // save the information of mob compactions in tables.
   // the key is table name, the value is the number of compactions in that table.
-  private Map<TableName, AtomicInteger> mobFileCompactionStates = Maps.newConcurrentMap();
+  private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
 
   MasterCoprocessorHost cpHost;
 
@@ -796,9 +795,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
     getChoreService().scheduleChore(expiredMobFileCleanerChore);
 
-    this.mobFileCompactChore = new MobFileCompactionChore(this);
-    getChoreService().scheduleChore(mobFileCompactChore);
-    this.mobFileCompactThread = new MasterMobFileCompactionThread(this);
+    this.mobCompactChore = new MobCompactionChore(this);
+    getChoreService().scheduleChore(mobCompactChore);
+    this.mobCompactThread = new MasterMobCompactionThread(this);
 
     if (this.cpHost != null) {
       // don't let cp initialization errors kill the master
@@ -1134,8 +1133,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     if (this.expiredMobFileCleanerChore != null) {
       this.expiredMobFileCleanerChore.cancel(true);
     }
-    if (this.mobFileCompactChore != null) {
-      this.mobFileCompactChore.cancel(true);
+    if (this.mobCompactChore != null) {
+      this.mobCompactChore.cancel(true);
     }
     if (this.balancerChore != null) {
       this.balancerChore.cancel(true);
@@ -1149,8 +1148,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     if (this.clusterStatusPublisherChore != null){
       clusterStatusPublisherChore.cancel(true);
     }
-    if (this.mobFileCompactThread != null) {
-      this.mobFileCompactThread.close();
+    if (this.mobCompactThread != null) {
+      this.mobCompactThread.close();
     }
   }
 
@@ -2453,50 +2452,62 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
    * @return If a given table is in mob file compaction now.
    */
   public CompactionState getMobCompactionState(TableName tableName) {
-    AtomicInteger compactionsCount = mobFileCompactionStates.get(tableName);
+    AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
     if (compactionsCount != null && compactionsCount.get() != 0) {
       return CompactionState.MAJOR_AND_MINOR;
     }
     return CompactionState.NONE;
   }
 
-  public void reportMobFileCompactionStart(TableName tableName) throws IOException {
+  public void reportMobCompactionStart(TableName tableName) throws IOException {
     IdLock.Entry lockEntry = null;
     try {
-      lockEntry = mobFileCompactionLock.getLockEntry(tableName.hashCode());
-      AtomicInteger compactionsCount = mobFileCompactionStates.get(tableName);
+      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
+      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
       if (compactionsCount == null) {
         compactionsCount = new AtomicInteger(0);
-        mobFileCompactionStates.put(tableName, compactionsCount);
+        mobCompactionStates.put(tableName, compactionsCount);
       }
       compactionsCount.incrementAndGet();
     } finally {
       if (lockEntry != null) {
-        mobFileCompactionLock.releaseLockEntry(lockEntry);
+        mobCompactionLock.releaseLockEntry(lockEntry);
       }
     }
   }
 
-  public void reportMobFileCompactionEnd(TableName tableName) throws IOException {
+  public void reportMobCompactionEnd(TableName tableName) throws IOException {
     IdLock.Entry lockEntry = null;
     try {
-      lockEntry = mobFileCompactionLock.getLockEntry(tableName.hashCode());
-      AtomicInteger compactionsCount = mobFileCompactionStates.get(tableName);
+      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
+      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
       if (compactionsCount != null) {
         int count = compactionsCount.decrementAndGet();
         // remove the entry if the count is 0.
         if (count == 0) {
-          mobFileCompactionStates.remove(tableName);
+          mobCompactionStates.remove(tableName);
         }
       }
     } finally {
       if (lockEntry != null) {
-        mobFileCompactionLock.releaseLockEntry(lockEntry);
+        mobCompactionLock.releaseLockEntry(lockEntry);
       }
     }
   }
 
   /**
+   * Requests mob compaction.
+   * @param tableName The table the compact.
+   * @param columns The compacted columns.
+   * @param allFiles Whether add all mob files into the compaction.
+   */
+  public void requestMobCompaction(TableName tableName,
+    List<HColumnDescriptor> columns, boolean allFiles) throws IOException {
+    mobCompactThread.requestMobCompaction(conf, fs, tableName, columns,
+      tableLockManager, allFiles);
+  }
+
+  /**
    * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
    * false is returned.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java
new file mode 100644
index 0000000..d1f58ba
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java
@@ -0,0 +1,184 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * The mob compaction thread used in {@link MasterRpcServices}
+ */
+@InterfaceAudience.Private
+public class MasterMobCompactionThread {
+  static final Log LOG = LogFactory.getLog(MasterMobCompactionThread.class);
+  private final HMaster master;
+  private final Configuration conf;
+  private final ExecutorService mobCompactorPool;
+  private final ExecutorService masterMobPool;
+
+  public MasterMobCompactionThread(HMaster master) {
+    this.master = master;
+    this.conf = master.getConfiguration();
+    final String n = Thread.currentThread().getName();
+    // this pool is used to run the mob compaction
+    this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
+      new SynchronousQueue<Runnable>(), new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+          Thread t = new Thread(r);
+          t.setName(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime());
+          return t;
+        }
+      });
+    ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
+    // this pool is used in the mob compaction to compact the mob files by partitions
+    // in parallel
+    this.mobCompactorPool = MobUtils
+      .createMobCompactorThreadPool(master.getConfiguration());
+  }
+
+  /**
+   * Requests mob compaction
+   * @param conf The Configuration
+   * @param fs The file system
+   * @param tableName The table the compact
+   * @param columns The column descriptors
+   * @param tableLockManager The tableLock manager
+   * @param allFiles Whether add all mob files into the compaction.
+   */
+  public void requestMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
+    List<HColumnDescriptor> columns, TableLockManager tableLockManager, boolean allFiles)
+    throws IOException {
+    master.reportMobCompactionStart(tableName);
+    try {
+      masterMobPool.execute(new CompactionRunner(fs, tableName, columns, tableLockManager,
+        allFiles, mobCompactorPool));
+    } catch (RejectedExecutionException e) {
+      // in case the request is rejected by the pool
+      try {
+        master.reportMobCompactionEnd(tableName);
+      } catch (IOException e1) {
+        LOG.error("Failed to mark end of mob compation", e1);
+      }
+      throw e;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The mob compaction is requested for the columns " + columns
+        + " of the table " + tableName.getNameAsString());
+    }
+  }
+
+  private class CompactionRunner implements Runnable {
+    private FileSystem fs;
+    private TableName tableName;
+    private List<HColumnDescriptor> hcds;
+    private TableLockManager tableLockManager;
+    private boolean allFiles;
+    private ExecutorService pool;
+
+    public CompactionRunner(FileSystem fs, TableName tableName, List<HColumnDescriptor> hcds,
+      TableLockManager tableLockManager, boolean allFiles, ExecutorService pool) {
+      super();
+      this.fs = fs;
+      this.tableName = tableName;
+      this.hcds = hcds;
+      this.tableLockManager = tableLockManager;
+      this.allFiles = allFiles;
+      this.pool = pool;
+    }
+
+    @Override
+    public void run() {
+      try {
+        for (HColumnDescriptor hcd : hcds) {
+          MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, tableLockManager,
+            allFiles);
+        }
+      } catch (IOException e) {
+        LOG.error("Failed to perform the mob compaction", e);
+      } finally {
+        try {
+          master.reportMobCompactionEnd(tableName);
+        } catch (IOException e) {
+          LOG.error("Failed to mark end of mob compation", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Only interrupt once it's done with a run through the work loop.
+   */
+  private void interruptIfNecessary() {
+    mobCompactorPool.shutdown();
+    masterMobPool.shutdown();
+  }
+
+  /**
+   * Wait for all the threads finish.
+   */
+  private void join() {
+    waitFor(mobCompactorPool, "Mob Compaction Thread");
+    waitFor(masterMobPool, "Region Server Mob Compaction Thread");
+  }
+
+  /**
+   * Closes the MasterMobCompactionThread.
+   */
+  public void close() {
+    interruptIfNecessary();
+    join();
+  }
+
+  /**
+   * Wait for thread finish.
+   * @param t the thread to wait
+   * @param name the thread name.
+   */
+  private void waitFor(ExecutorService t, String name) {
+    boolean done = false;
+    while (!done) {
+      try {
+        done = t.awaitTermination(60, TimeUnit.SECONDS);
+        LOG.info("Waiting for " + name + " to finish...");
+        if (!done) {
+          t.shutdownNow();
+        }
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted waiting for " + name + " to finish...");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobFileCompactionThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobFileCompactionThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobFileCompactionThread.java
deleted file mode 100644
index f6810a1..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobFileCompactionThread.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-/**
- * The mob file compaction thread used in {@link MasterRpcServices}
- */
-@InterfaceAudience.Private
-public class MasterMobFileCompactionThread {
-  static final Log LOG = LogFactory.getLog(MasterMobFileCompactionThread.class);
-  private final HMaster master;
-  private final Configuration conf;
-  private final ExecutorService mobFileCompactorPool;
-  private final ExecutorService masterMobPool;
-
-  public MasterMobFileCompactionThread(HMaster master) {
-    this.master = master;
-    this.conf = master.getConfiguration();
-    final String n = Thread.currentThread().getName();
-    // this pool is used to run the mob file compaction
-    this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
-      new SynchronousQueue<Runnable>(), new ThreadFactory() {
-        @Override
-        public Thread newThread(Runnable r) {
-          Thread t = new Thread(r);
-          t.setName(n + "-MasterMobFileCompaction-" + EnvironmentEdgeManager.currentTime());
-          return t;
-        }
-      });
-    ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
-    // this pool is used in the mob file compaction to compact the mob files by partitions
-    // in parallel
-    this.mobFileCompactorPool = MobUtils
-      .createMobFileCompactorThreadPool(master.getConfiguration());
-  }
-
-  /**
-   * Requests mob file compaction
-   * @param conf The Configuration
-   * @param fs The file system
-   * @param tableName The table the compact
-   * @param hcds The column descriptors
-   * @param tableLockManager The tableLock manager
-   * @param isForceAllFiles Whether add all mob files into the compaction.
-   */
-  public void requestMobFileCompaction(Configuration conf, FileSystem fs, TableName tableName,
-    List<HColumnDescriptor> hcds, TableLockManager tableLockManager, boolean isForceAllFiles)
-    throws IOException {
-    master.reportMobFileCompactionStart(tableName);
-    try {
-      masterMobPool.execute(new CompactionRunner(fs, tableName, hcds, tableLockManager,
-        isForceAllFiles, mobFileCompactorPool));
-    } catch (RejectedExecutionException e) {
-      // in case the request is rejected by the pool
-      try {
-        master.reportMobFileCompactionEnd(tableName);
-      } catch (IOException e1) {
-        LOG.error("Failed to mark end of mob file compation", e1);
-      }
-      throw e;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("The mob file compaction is requested for the columns " + hcds + " of the table "
-        + tableName.getNameAsString());
-    }
-  }
-
-  private class CompactionRunner implements Runnable {
-    private FileSystem fs;
-    private TableName tableName;
-    private List<HColumnDescriptor> hcds;
-    private TableLockManager tableLockManager;
-    private boolean isForceAllFiles;
-    private ExecutorService pool;
-
-    public CompactionRunner(FileSystem fs, TableName tableName, List<HColumnDescriptor> hcds,
-      TableLockManager tableLockManager, boolean isForceAllFiles, ExecutorService pool) {
-      super();
-      this.fs = fs;
-      this.tableName = tableName;
-      this.hcds = hcds;
-      this.tableLockManager = tableLockManager;
-      this.isForceAllFiles = isForceAllFiles;
-      this.pool = pool;
-    }
-
-    @Override
-    public void run() {
-      try {
-        for (HColumnDescriptor hcd : hcds) {
-          MobUtils.doMobFileCompaction(conf, fs, tableName, hcd, pool, tableLockManager,
-            isForceAllFiles);
-        }
-      } catch (IOException e) {
-        LOG.error("Failed to perform the mob file compaction", e);
-      } finally {
-        try {
-          master.reportMobFileCompactionEnd(tableName);
-        } catch (IOException e) {
-          LOG.error("Failed to mark end of mob file compation", e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Only interrupt once it's done with a run through the work loop.
-   */
-  private void interruptIfNecessary() {
-    mobFileCompactorPool.shutdown();
-    masterMobPool.shutdown();
-  }
-
-  /**
-   * Wait for all the threads finish.
-   */
-  private void join() {
-    waitFor(mobFileCompactorPool, "Mob file Compaction Thread");
-    waitFor(masterMobPool, "Region Server Mob File Compaction Thread");
-  }
-
-  /**
-   * Closes the MasterMobFileCompactionThread.
-   */
-  public void close() {
-    interruptIfNecessary();
-    join();
-  }
-
-  /**
-   * Wait for thread finish.
-   * @param t the thread to wait
-   * @param name the thread name.
-   */
-  private void waitFor(ExecutorService t, String name) {
-    boolean done = false;
-    while (!done) {
-      try {
-        done = t.awaitTermination(60, TimeUnit.SECONDS);
-        LOG.info("Waiting for " + name + " to finish...");
-        if (!done) {
-          t.shutdownNow();
-        }
-      } catch (InterruptedException ie) {
-        LOG.warn("Interrupted waiting for " + name + " to finish...");
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 4b8d6b8..7d025db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -1428,7 +1428,7 @@ public class MasterRpcServices extends RSRpcServices
     if (!master.getTableStateManager().isTableState(tableName, TableState.State.ENABLED)) {
       throw new DoNotRetryIOException("Table " + tableName + " is not enabled");
     }
-    boolean isForceAllFiles = false;
+    boolean allFiles = false;
     List<HColumnDescriptor> compactedColumns = new ArrayList<HColumnDescriptor>();
     HColumnDescriptor[] hcds = master.getTableDescriptors().get(tableName).getColumnFamilies();
     byte[] family = null;
@@ -1437,8 +1437,8 @@ public class MasterRpcServices extends RSRpcServices
       for (HColumnDescriptor hcd : hcds) {
         if (Bytes.equals(family, hcd.getName())) {
           if (!hcd.isMobEnabled()) {
-            LOG.error("Column family " + hcd.getName() + " is not a mob column family");
-            throw new DoNotRetryIOException("Column family " + hcd.getName()
+            LOG.error("Column family " + hcd.getNameAsString() + " is not a mob column family");
+            throw new DoNotRetryIOException("Column family " + hcd.getNameAsString()
                     + " is not a mob column family");
           }
           compactedColumns.add(hcd);
@@ -1452,21 +1452,19 @@ public class MasterRpcServices extends RSRpcServices
       }
     }
     if (compactedColumns.isEmpty()) {
-      LOG.error("No mob column families are assigned in the mob file compaction");
+      LOG.error("No mob column families are assigned in the mob compaction");
       throw new DoNotRetryIOException(
-              "No mob column families are assigned in the mob file compaction");
+              "No mob column families are assigned in the mob compaction");
     }
     if (request.hasMajor() && request.getMajor()) {
-      isForceAllFiles = true;
+      allFiles = true;
     }
     String familyLogMsg = (family != null) ? Bytes.toString(family) : "";
     if (LOG.isTraceEnabled()) {
-      LOG.trace("User-triggered mob file compaction requested for table: "
+      LOG.trace("User-triggered mob compaction requested for table: "
               + tableName.getNameAsString() + " for column family: " + familyLogMsg);
     }
-    master.mobFileCompactThread.requestMobFileCompaction(master.getConfiguration(),
-            master.getFileSystem(), tableName, compactedColumns,
-            master.getTableLockManager(), isForceAllFiles);
+    master.requestMobCompaction(tableName, compactedColumns, allFiles);
     return CompactRegionResponse.newBuilder().build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java
new file mode 100644
index 0000000..28af3eb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java
@@ -0,0 +1,97 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * The Class MobCompactChore for running compaction regularly to merge small mob files.
+ */
+@InterfaceAudience.Private
+public class MobCompactionChore extends ScheduledChore {
+
+  private static final Log LOG = LogFactory.getLog(MobCompactionChore.class);
+  private HMaster master;
+  private TableLockManager tableLockManager;
+  private ExecutorService pool;
+
+  public MobCompactionChore(HMaster master) {
+    super(master.getServerName() + "-MobCompactionChore", master, master.getConfiguration()
+      .getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
+        MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD), master.getConfiguration().getInt(
+      MobConstants.MOB_COMPACTION_CHORE_PERIOD,
+      MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD), TimeUnit.SECONDS);
+    this.master = master;
+    this.tableLockManager = master.getTableLockManager();
+    this.pool = MobUtils.createMobCompactorThreadPool(master.getConfiguration());
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      TableDescriptors htds = master.getTableDescriptors();
+      Map<String, HTableDescriptor> map = htds.getAll();
+      for (HTableDescriptor htd : map.values()) {
+        if (!master.getTableStateManager().isTableState(htd.getTableName(),
+          TableState.State.ENABLED)) {
+          continue;
+        }
+        boolean reported = false;
+        try {
+          for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
+            if (!hcd.isMobEnabled()) {
+              continue;
+            }
+            if (!reported) {
+              master.reportMobCompactionStart(htd.getTableName());
+              reported = true;
+            }
+            MobUtils.doMobCompaction(master.getConfiguration(), master.getFileSystem(),
+              htd.getTableName(), hcd, pool, tableLockManager, false);
+          }
+        } finally {
+          if (reported) {
+            master.reportMobCompactionEnd(htd.getTableName());
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to compact mob files", e);
+    }
+  }
+
+  @Override
+  protected void cleanup() {
+    super.cleanup();
+    pool.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java
deleted file mode 100644
index 13c52f0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.mob.MobConstants;
-import org.apache.hadoop.hbase.mob.MobUtils;
-
-/**
- * The Class MobFileCompactChore for running compaction regularly to merge small mob files.
- */
-@InterfaceAudience.Private
-public class MobFileCompactionChore extends ScheduledChore {
-
-  private static final Log LOG = LogFactory.getLog(MobFileCompactionChore.class);
-  private HMaster master;
-  private TableLockManager tableLockManager;
-  private ExecutorService pool;
-
-  public MobFileCompactionChore(HMaster master) {
-    super(master.getServerName() + "-MobFileCompactChore", master, master.getConfiguration()
-      .getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD,
-        MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD), master.getConfiguration().getInt(
-      MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD,
-      MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD), TimeUnit.SECONDS);
-    this.master = master;
-    this.tableLockManager = master.getTableLockManager();
-    this.pool = MobUtils.createMobFileCompactorThreadPool(master.getConfiguration());
-  }
-
-  @Override
-  protected void chore() {
-    try {
-      TableDescriptors htds = master.getTableDescriptors();
-      Map<String, HTableDescriptor> map = htds.getAll();
-      for (HTableDescriptor htd : map.values()) {
-        if (!master.getTableStateManager().isTableState(htd.getTableName(),
-          TableState.State.ENABLED)) {
-          continue;
-        }
-        boolean reported = false;
-        try {
-          for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
-            if (!hcd.isMobEnabled()) {
-              continue;
-            }
-            if (!reported) {
-              master.reportMobFileCompactionStart(htd.getTableName());
-              reported = true;
-            }
-            MobUtils.doMobFileCompaction(master.getConfiguration(), master.getFileSystem(),
-              htd.getTableName(), hcd, pool, tableLockManager, false);
-          }
-        } finally {
-          if (reported) {
-            master.reportMobFileCompactionEnd(htd.getTableName());
-          }
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("Fail to clean the expired mob files", e);
-    }
-  }
-
-  @Override
-  protected void cleanup() {
-    super.cleanup();
-    pool.shutdown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
index cbff5dd..6069eba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 
 @InterfaceAudience.Private
@@ -73,8 +74,8 @@ public class DeleteTableHandler extends TableEventHandler {
     long waitTime = server.getConfiguration().
       getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
     for (HRegionInfo region : regions) {
-      long done = System.currentTimeMillis() + waitTime;
-      while (System.currentTimeMillis() < done) {
+      long done = EnvironmentEdgeManager.currentTime() + waitTime;
+      while (EnvironmentEdgeManager.currentTime() < done) {
         if (states.isRegionInState(region, State.FAILED_OPEN)) {
           am.regionOffline(region);
         }
@@ -192,14 +193,7 @@ public class DeleteTableHandler extends TableEventHandler {
       }
 
       // Archive the mob data if there is a mob-enabled column
-      HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies();
-      boolean hasMob = false;
-      for (HColumnDescriptor hcd : hcds) {
-        if (hcd.isMobEnabled()) {
-          hasMob = true;
-          break;
-        }
-      }
+      boolean hasMob = MobUtils.hasMobColumns(hTableDescriptor);
       Path mobTableDir = null;
       if (hasMob) {
         // Archive mob data

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
index dfc5762..0e561d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
@@ -344,14 +344,7 @@ public class DeleteTableProcedure
 
     // Archive the mob data if there is a mob-enabled column
     HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(tableName);
-    HColumnDescriptor[] hcds = htd.getColumnFamilies();
-    boolean hasMob = false;
-    for (HColumnDescriptor hcd : hcds) {
-      if (hcd.isMobEnabled()) {
-        hasMob = true;
-        break;
-      }
-    }
+    boolean hasMob = MobUtils.hasMobColumns(htd);
     Path mobTableDir = null;
     if (hasMob) {
       // Archive mob data

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
deleted file mode 100644
index d54dca4..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mob;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagType;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.*;
-import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
-import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Compact passed set of files in the mob-enabled column family.
- */
-@InterfaceAudience.Private
-public class DefaultMobCompactor extends DefaultCompactor {
-
-  private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class);
-  private long mobSizeThreshold;
-  private HMobStore mobStore;
-  public DefaultMobCompactor(Configuration conf, Store store) {
-    super(conf, store);
-    // The mob cells reside in the mob-enabled column family which is held by HMobStore.
-    // During the compaction, the compactor reads the cells from the mob files and
-    // probably creates new mob files. All of these operations are included in HMobStore,
-    // so we need to cast the Store to HMobStore.
-    if (!(store instanceof HMobStore)) {
-      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
-    }
-    mobStore = (HMobStore) store;
-    mobSizeThreshold = store.getFamily().getMobThreshold();
-  }
-
-  /**
-   * Creates a writer for a new file in a temporary directory.
-   * @param fd The file details.
-   * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
-   * @return Writer for a new StoreFile in the tmp dir.
-   * @throws IOException
-   */
-  @Override
-  protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
-    // make this writer with tags always because of possible new cells with tags.
-    StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
-        true, fd.maxMVCCReadpoint >= smallestReadPoint, true);
-    return writer;
-  }
-
-  @Override
-  protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
-      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
-    Scan scan = new Scan();
-    scan.setMaxVersions(store.getFamily().getMaxVersions());
-    if (scanType == ScanType.COMPACT_DROP_DELETES) {
-      scanType = ScanType.COMPACT_RETAIN_DELETES;
-      return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
-          scanType, smallestReadPoint, earliestPutTs, true);
-    } else {
-      return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
-          scanType, smallestReadPoint, earliestPutTs, false);
-    }
-  }
-
-  // TODO refactor to take advantage of the throughput controller.
-
-  /**
-   * Performs compaction on a column family with the mob flag enabled.
-   * This is for when the mob threshold size has changed or if the mob
-   * column family mode has been toggled via an alter table statement.
-   * Compacts the files by the following rules.
-   * 1. If the cell has a mob reference tag, the cell's value is the path of the mob file.
-   * <ol>
-   * <li>
-   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
-   * directly copy the (with mob tag) cell into the new store file.
-   * </li>
-   * <li>
-   * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
-   * the new store file.
-   * </li>
-   * </ol>
-   * 2. If the cell doesn't have a reference tag.
-   * <ol>
-   * <li>
-   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
-   * write this cell to a mob file, and write the path of this mob file to the store file.
-   * </li>
-   * <li>
-   * Otherwise, directly write this cell into the store file.
-   * </li>
-   * </ol>
-   * In the mob compaction, the {@link MobCompactionStoreScanner} is used as a scanner
-   * which could output the normal cells and delete markers together when required.
-   * After the major compaction on the normal hfiles, we have a guarantee that we have purged all
-   * deleted or old version mob refs, and the delete markers are written to a del file with the
-   * suffix _del. Because of this, it is safe to use the del file in the mob compaction.
-   * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the
-   * mob files. When the small mob files are merged into bigger ones, the del file is added into
-   * the scanner to filter the deleted cells.
-   * @param fd File details
-   * @param scanner Where to read from.
-   * @param writer Where to write to.
-   * @param smallestReadPoint Smallest read point.
-   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
-   * @param major Is a major compaction.
-   * @return Whether compaction ended; false if it was interrupted for any reason.
-   */
-  @Override
-  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
-      long smallestReadPoint, boolean cleanSeqId,
-      CompactionThroughputController throughputController,  boolean major) throws IOException {
-    if (!(scanner instanceof MobCompactionStoreScanner)) {
-      throw new IllegalArgumentException(
-          "The scanner should be an instance of MobCompactionStoreScanner");
-    }
-    MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) scanner;
-    int bytesWritten = 0;
-    // Since scanner.next() can return 'false' but still be delivering data,
-    // we have to use a do/while loop.
-    List<Cell> cells = new ArrayList<Cell>();
-    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
-    int closeCheckInterval = HStore.getCloseCheckInterval();
-    boolean hasMore;
-    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
-    byte[] fileName = null;
-    StoreFile.Writer mobFileWriter = null;
-    StoreFile.Writer delFileWriter = null;
-    long mobCells = 0;
-    long deleteMarkersCount = 0;
-    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
-            .getName());
-    long mobCompactedIntoMobCellsCount = 0;
-    long mobCompactedFromMobCellsCount = 0;
-    long mobCompactedIntoMobCellsSize = 0;
-    long mobCompactedFromMobCellsSize = 0;
-    try {
-      try {
-        // If the mob file writer could not be created, directly write the cell to the store file.
-        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
-            store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
-        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
-      } catch (IOException e) {
-        LOG.error(
-            "Fail to create mob writer, "
-                + "we will continue the compaction by writing MOB cells directly in store files",
-            e);
-      }
-      delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
-          store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
-      ScannerContext scannerContext =
-              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
-
-
-      do {
-        hasMore = compactionScanner.next(cells, scannerContext);
-        // output to writer:
-        for (Cell c : cells) {
-          if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
-            CellUtil.setSequenceId(c, 0);
-          }
-          if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) {
-            delFileWriter.append(c);
-            deleteMarkersCount++;
-          } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
-            // If the mob file writer is null or the kv type is not put, directly write the cell
-            // to the store file.
-            writer.append(c);
-          } else if (MobUtils.isMobReferenceCell(c)) {
-            if (MobUtils.hasValidMobRefCellValue(c)) {
-              int size = MobUtils.getMobValueLength(c);
-              if (size > mobSizeThreshold) {
-                // If the value size is larger than the threshold, it's regarded as a mob. Since
-                // its value is already in the mob file, directly write this cell to the store file
-                writer.append(c);
-              } else {
-                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
-                // the mob cell from the mob file, and write it back to the store file.
-                Cell mobCell = mobStore.resolve(c, false);
-                if (mobCell.getValueLength() != 0) {
-                  // put the mob data back to the store file
-                  CellUtil.setSequenceId(mobCell, c.getSequenceId());
-                  writer.append(mobCell);
-                  mobCompactedFromMobCellsCount++;
-                  mobCompactedFromMobCellsSize += mobCell.getValueLength();
-                } else {
-                  // If the value of a file is empty, there might be issues when retrieving,
-                  // directly write the cell to the store file, and leave it to be handled by the
-                  // next compaction.
-                  writer.append(c);
-                }
-              }
-            } else {
-              LOG.warn("The value format of the KeyValue " + c
-                  + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
-              writer.append(c);
-            }
-          } else if (c.getValueLength() <= mobSizeThreshold) {
-            // If the value size of a cell is not larger than the threshold, directly write it to
-            // the store file.
-            writer.append(c);
-          } else {
-            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
-            // write this cell to a mob file, and write the path to the store file.
-            mobCells++;
-            // append the original keyValue in the mob file.
-            mobFileWriter.append(c);
-            KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag);
-            // write the cell whose value is the path of a mob file to the store file.
-            writer.append(reference);
-            mobCompactedIntoMobCellsCount++;
-            mobCompactedIntoMobCellsSize += c.getValueLength();
-          }
-          ++progress.currentCompactedKVs;
-
-          // check periodically to see if a system stop is requested
-          if (closeCheckInterval > 0) {
-            bytesWritten += KeyValueUtil.length(c);
-            if (bytesWritten > closeCheckInterval) {
-              bytesWritten = 0;
-              if (!store.areWritesEnabled()) {
-                progress.cancel();
-                return false;
-              }
-            }
-          }
-        }
-        cells.clear();
-      } while (hasMore);
-    } finally {
-      if (mobFileWriter != null) {
-        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
-        mobFileWriter.close();
-      }
-      if (delFileWriter != null) {
-        delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
-        delFileWriter.close();
-      }
-    }
-    if (mobFileWriter != null) {
-      if (mobCells > 0) {
-        // If the mob file is not empty, commit it.
-        mobStore.commitFile(mobFileWriter.getPath(), path);
-      } else {
-        try {
-          // If the mob file is empty, delete it instead of committing.
-          store.getFileSystem().delete(mobFileWriter.getPath(), true);
-        } catch (IOException e) {
-          LOG.error("Fail to delete the temp mob file", e);
-        }
-      }
-    }
-    if (delFileWriter != null) {
-      if (deleteMarkersCount > 0) {
-        // If the del file is not empty, commit it.
-        // If the commit fails, the compaction is re-performed again.
-        mobStore.commitFile(delFileWriter.getPath(), path);
-      } else {
-        try {
-          // If the del file is empty, delete it instead of committing.
-          store.getFileSystem().delete(delFileWriter.getPath(), true);
-        } catch (IOException e) {
-          LOG.error("Fail to delete the temp del file", e);
-        }
-      }
-    }
-    mobStore.updateMobCompactedFromMobCellsCount(mobCompactedFromMobCellsCount);
-    mobStore.updateMobCompactedIntoMobCellsCount(mobCompactedIntoMobCellsCount);
-    mobStore.updateMobCompactedFromMobCellsSize(mobCompactedFromMobCellsSize);
-    mobStore.updateMobCompactedIntoMobCellsSize(mobCompactedIntoMobCellsSize);
-    progress.complete();
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
new file mode 100644
index 0000000..fbcff85
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Compact passed set of files in the mob-enabled column family.
+ */
+@InterfaceAudience.Private
+public class DefaultMobStoreCompactor extends DefaultCompactor {
+
+  private static final Log LOG = LogFactory.getLog(DefaultMobStoreCompactor.class);
+  private long mobSizeThreshold;
+  private HMobStore mobStore;
+  public DefaultMobStoreCompactor(Configuration conf, Store store) {
+    super(conf, store);
+    // The mob cells reside in the mob-enabled column family which is held by HMobStore.
+    // During the compaction, the compactor reads the cells from the mob files and
+    // probably creates new mob files. All of these operations are included in HMobStore,
+    // so we need to cast the Store to HMobStore.
+    if (!(store instanceof HMobStore)) {
+      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
+    }
+    mobStore = (HMobStore) store;
+    mobSizeThreshold = store.getFamily().getMobThreshold();
+  }
+
+  /**
+   * Creates a writer for a new file in a temporary directory.
+   * @param fd The file details.
+   * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
+   * @return Writer for a new StoreFile in the tmp dir.
+   * @throws IOException
+   */
+  @Override
+  protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
+    // make this writer with tags always because of possible new cells with tags.
+    StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
+        true, fd.maxMVCCReadpoint >= smallestReadPoint, true);
+    return writer;
+  }
+
+  @Override
+  protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
+      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+    Scan scan = new Scan();
+    scan.setMaxVersions(store.getFamily().getMaxVersions());
+    if (scanType == ScanType.COMPACT_DROP_DELETES) {
+      // In major compaction, we need to write the delete markers to del files, so we have to
+      // retain the them in scanning.
+      scanType = ScanType.COMPACT_RETAIN_DELETES;
+      return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
+          scanType, smallestReadPoint, earliestPutTs, true);
+    } else {
+      return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
+          scanType, smallestReadPoint, earliestPutTs, false);
+    }
+  }
+
+  // TODO refactor to take advantage of the throughput controller.
+
+  /**
+   * Performs compaction on a column family with the mob flag enabled.
+   * This is for when the mob threshold size has changed or if the mob
+   * column family mode has been toggled via an alter table statement.
+   * Compacts the files by the following rules.
+   * 1. If the cell has a mob reference tag, the cell's value is the path of the mob file.
+   * <ol>
+   * <li>
+   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
+   * directly copy the (with mob tag) cell into the new store file.
+   * </li>
+   * <li>
+   * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
+   * the new store file.
+   * </li>
+   * </ol>
+   * 2. If the cell doesn't have a reference tag.
+   * <ol>
+   * <li>
+   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
+   * write this cell to a mob file, and write the path of this mob file to the store file.
+   * </li>
+   * <li>
+   * Otherwise, directly write this cell into the store file.
+   * </li>
+   * </ol>
+   * In the mob compaction, the {@link MobCompactionStoreScanner} is used as a scanner
+   * which could output the normal cells and delete markers together when required.
+   * After the major compaction on the normal hfiles, we have a guarantee that we have purged all
+   * deleted or old version mob refs, and the delete markers are written to a del file with the
+   * suffix _del. Because of this, it is safe to use the del file in the mob compaction.
+   * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the
+   * mob files. When the small mob files are merged into bigger ones, the del file is added into
+   * the scanner to filter the deleted cells.
+   * @param fd File details
+   * @param scanner Where to read from.
+   * @param writer Where to write to.
+   * @param smallestReadPoint Smallest read point.
+   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
+   * @param throughputController The compaction throughput controller.
+   * @param major Is a major compaction.
+   * @return Whether compaction ended; false if it was interrupted for any reason.
+   */
+  @Override
+  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
+      long smallestReadPoint, boolean cleanSeqId,
+      CompactionThroughputController throughputController,  boolean major) throws IOException {
+    if (!(scanner instanceof MobCompactionStoreScanner)) {
+      throw new IllegalArgumentException(
+          "The scanner should be an instance of MobCompactionStoreScanner");
+    }
+    MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) scanner;
+    int bytesWritten = 0;
+    // Since scanner.next() can return 'false' but still be delivering data,
+    // we have to use a do/while loop.
+    List<Cell> cells = new ArrayList<Cell>();
+    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+    int closeCheckInterval = HStore.getCloseCheckInterval();
+    boolean hasMore;
+    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
+    byte[] fileName = null;
+    StoreFile.Writer mobFileWriter = null;
+    StoreFile.Writer delFileWriter = null;
+    long mobCells = 0;
+    long deleteMarkersCount = 0;
+    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
+            .getName());
+    long cellsCountCompactedToMob = 0;
+    long cellsCountCompactedFromMob = 0;
+    long cellsSizeCompactedToMob = 0;
+    long cellsSizeCompactedFromMob = 0;
+    try {
+      try {
+        // If the mob file writer could not be created, directly write the cell to the store file.
+        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
+            store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
+        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
+      } catch (IOException e) {
+        LOG.error(
+            "Failed to create mob writer, "
+                + "we will continue the compaction by writing MOB cells directly in store files",
+            e);
+      }
+      delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
+          store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
+      ScannerContext scannerContext =
+              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+      do {
+        hasMore = compactionScanner.next(cells, scannerContext);
+        // output to writer:
+        for (Cell c : cells) {
+          if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
+            CellUtil.setSequenceId(c, 0);
+          }
+          if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) {
+            delFileWriter.append(c);
+            deleteMarkersCount++;
+          } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
+            // If the mob file writer is null or the kv type is not put, directly write the cell
+            // to the store file.
+            writer.append(c);
+          } else if (MobUtils.isMobReferenceCell(c)) {
+            if (MobUtils.hasValidMobRefCellValue(c)) {
+              int size = MobUtils.getMobValueLength(c);
+              if (size > mobSizeThreshold) {
+                // If the value size is larger than the threshold, it's regarded as a mob. Since
+                // its value is already in the mob file, directly write this cell to the store file
+                writer.append(c);
+              } else {
+                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
+                // the mob cell from the mob file, and write it back to the store file.
+                Cell mobCell = mobStore.resolve(c, false);
+                if (mobCell.getValueLength() != 0) {
+                  // put the mob data back to the store file
+                  CellUtil.setSequenceId(mobCell, c.getSequenceId());
+                  writer.append(mobCell);
+                  cellsCountCompactedFromMob++;
+                  cellsSizeCompactedFromMob += mobCell.getValueLength();
+                } else {
+                  // If the value of a file is empty, there might be issues when retrieving,
+                  // directly write the cell to the store file, and leave it to be handled by the
+                  // next compaction.
+                  writer.append(c);
+                }
+              }
+            } else {
+              LOG.warn("The value format of the KeyValue " + c
+                  + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
+              writer.append(c);
+            }
+          } else if (c.getValueLength() <= mobSizeThreshold) {
+            // If the value size of a cell is not larger than the threshold, directly write it to
+            // the store file.
+            writer.append(c);
+          } else {
+            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
+            // write this cell to a mob file, and write the path to the store file.
+            mobCells++;
+            // append the original keyValue in the mob file.
+            mobFileWriter.append(c);
+            KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag);
+            // write the cell whose value is the path of a mob file to the store file.
+            writer.append(reference);
+            cellsCountCompactedToMob++;
+            cellsSizeCompactedToMob += c.getValueLength();
+          }
+          ++progress.currentCompactedKVs;
+
+          // check periodically to see if a system stop is requested
+          if (closeCheckInterval > 0) {
+            bytesWritten += KeyValueUtil.length(c);
+            if (bytesWritten > closeCheckInterval) {
+              bytesWritten = 0;
+              if (!store.areWritesEnabled()) {
+                progress.cancel();
+                return false;
+              }
+            }
+          }
+        }
+        cells.clear();
+      } while (hasMore);
+    } finally {
+      if (mobFileWriter != null) {
+        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
+        mobFileWriter.close();
+      }
+      if (delFileWriter != null) {
+        delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
+        delFileWriter.close();
+      }
+    }
+    if (mobFileWriter != null) {
+      if (mobCells > 0) {
+        // If the mob file is not empty, commit it.
+        mobStore.commitFile(mobFileWriter.getPath(), path);
+      } else {
+        try {
+          // If the mob file is empty, delete it instead of committing.
+          store.getFileSystem().delete(mobFileWriter.getPath(), true);
+        } catch (IOException e) {
+          LOG.error("Failed to delete the temp mob file", e);
+        }
+      }
+    }
+    if (delFileWriter != null) {
+      if (deleteMarkersCount > 0) {
+        // If the del file is not empty, commit it.
+        // If the commit fails, the compaction is re-performed again.
+        mobStore.commitFile(delFileWriter.getPath(), path);
+      } else {
+        try {
+          // If the del file is empty, delete it instead of committing.
+          store.getFileSystem().delete(delFileWriter.getPath(), true);
+        } catch (IOException e) {
+          LOG.error("Failed to delete the temp del file", e);
+        }
+      }
+    }
+    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
+    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
+    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
+    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
+    progress.complete();
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index 608f4e2..47a0acf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -121,7 +121,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
     } finally {
       scanner.close();
     }
-    LOG.info("Flushed, sequenceid=" + cacheFlushId + ", memsize="
+    LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
         + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
         ", hasBloomFilter=" + writer.hasGeneralBloom() +
         ", into tmp file " + writer.getPath());
@@ -213,7 +213,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
         // If the mob file is empty, delete it instead of committing.
         store.getFileSystem().delete(mobFileWriter.getPath(), true);
       } catch (IOException e) {
-        LOG.error("Fail to delete the temp mob file", e);
+        LOG.error("Failed to delete the temp mob file", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
index 7f38c44..703ebd6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
@@ -112,7 +112,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
       try {
         admin.close();
       } catch (IOException e) {
-        LOG.error("Fail to close the HBaseAdmin.", e);
+        LOG.error("Failed to close the HBaseAdmin.", e);
       }
     }
   }