You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/09/19 12:11:02 UTC

[2/2] git commit: HBASE-11644 External MOB compaction tools (Jingcheng Du)

HBASE-11644 External MOB compaction tools (Jingcheng Du)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/84e957c8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/84e957c8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/84e957c8

Branch: refs/heads/hbase-11339
Commit: 84e957c875ae971578a5b147775445368ea26188
Parents: 9cf46dc
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Fri Sep 19 03:02:06 2014 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Sep 19 03:06:33 2014 -0700

----------------------------------------------------------------------
 .../src/main/resources/hbase-default.xml        |  34 ++
 .../master/ExpiredMobFileCleanerChore.java      |  70 +++
 .../org/apache/hadoop/hbase/master/HMaster.java |   7 +
 .../hadoop/hbase/mob/DefaultMobCompactor.java   |   2 +-
 .../hadoop/hbase/mob/ExpiredMobFileCleaner.java | 120 ++++
 .../apache/hadoop/hbase/mob/MobConstants.java   |  22 +
 .../org/apache/hadoop/hbase/mob/MobUtils.java   | 285 +++++++++-
 .../apache/hadoop/hbase/mob/MobZookeeper.java   | 270 +++++++++
 .../hbase/mob/mapreduce/MemStoreWrapper.java    | 184 +++++++
 .../mapreduce/MobFilePathHashPartitioner.java   |  41 ++
 .../hadoop/hbase/mob/mapreduce/SweepJob.java    | 550 +++++++++++++++++++
 .../mob/mapreduce/SweepJobNodeTracker.java      |  74 +++
 .../hadoop/hbase/mob/mapreduce/SweepMapper.java |  84 +++
 .../hbase/mob/mapreduce/SweepReducer.java       | 506 +++++++++++++++++
 .../hadoop/hbase/mob/mapreduce/Sweeper.java     | 108 ++++
 .../hadoop/hbase/regionserver/HMobStore.java    | 180 +++++-
 .../regionserver/MobReferenceOnlyFilter.java    |  42 ++
 .../hbase/mob/TestExpiredMobFileCleaner.java    | 180 ++++++
 .../hbase/mob/mapreduce/TestMobSweepJob.java    | 168 ++++++
 .../hbase/mob/mapreduce/TestMobSweepMapper.java | 100 ++++
 .../mob/mapreduce/TestMobSweepReducer.java      | 207 +++++++
 .../hbase/mob/mapreduce/TestMobSweeper.java     | 306 +++++++++++
 .../hbase/regionserver/TestMobCompaction.java   |  26 +-
 .../hbase/regionserver/TestMobStoreScanner.java |  72 +++
 24 files changed, 3594 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 872bbc5..647defd 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1483,4 +1483,38 @@ possible configurations would overwhelm and obscure the important.
       The default value is 0.5f.
     </description>
   </property>
+  <property>
+    <name>hbase.mob.sweep.tool.compaction.ratio</name>
+    <value>0.5f</value>
+    <description>
+      If there're too many cells deleted in a mob file, it's regarded
+      as an invalid file and needs to be merged.
+      If existingCellsSize/mobFileSize is less than ratio, it's regarded
+      as an invalid file. The default value is 0.5f.
+    </description>
+  </property>
+  <property>
+    <name>hbase.mob.sweep.tool.compaction.mergeable.size</name>
+    <value>134217728</value>
+    <description>
+      If the size of a mob file is less than this value, it's regarded as a small
+      file and needs to be merged. The default value is 128MB.
+    </description>
+  </property>
+  <property>
+    <name>hbase.mob.sweep.tool.compaction.memstore.flush.size</name>
+    <value>134217728</value>
+    <description>
+      The flush size for the memstore used by sweep job. Each sweep reducer owns such a memstore.
+      The default value is 128MB.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.mob.ttl.cleaner.period</name>
+    <value>86400000</value>
+    <description>
+      The period that ExpiredMobFileCleanerChore runs. The unit is millisecond.
+      The default value is one day.
+    </description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
new file mode 100644
index 0000000..98fe236
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired
+ * mob files.
+ */
+@InterfaceAudience.Private
+public class ExpiredMobFileCleanerChore extends Chore {
+
+  private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleanerChore.class);
+  private final HMaster master;
+  private ExpiredMobFileCleaner cleaner;
+
+  public ExpiredMobFileCleanerChore(HMaster master) {
+    super(master.getServerName() + "-ExpiredMobFileCleanerChore", master.getConfiguration().getInt(
+        MobConstants.MOB_CLEANER_PERIOD, MobConstants.DEFAULT_MOB_CLEANER_PERIOD), master);
+    this.master = master;
+    cleaner = new ExpiredMobFileCleaner();
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      TableDescriptors htds = master.getTableDescriptors();
+      Map<String, HTableDescriptor> map = htds.getAll();
+      for (HTableDescriptor htd : map.values()) {
+        for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
+          if (MobUtils.isMobFamily(hcd) && hcd.getMinVersions() == 0) {
+            cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Fail to clean the expired mob files", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 714b5a8..4ff3592 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -208,6 +208,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   CatalogJanitor catalogJanitorChore;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
+  private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
 
   MasterCoprocessorHost cpHost;
 
@@ -610,6 +611,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     // master initialization. See HBASE-5916.
     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
 
+    this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
+    Threads.setDaemonThreadRunning(expiredMobFileCleanerChore.getThread());
+
     if (this.cpHost != null) {
       // don't let cp initialization errors kill the master
       try {
@@ -856,6 +860,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   }
 
   private void stopChores() {
+    if (this.expiredMobFileCleanerChore != null) {
+      this.expiredMobFileCleanerChore.interrupt();
+    }
     if (this.balancerChore != null) {
       this.balancerChore.interrupt();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
index 5f13502..fd35a15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
@@ -152,7 +152,7 @@ public class DefaultMobCompactor extends DefaultCompactor {
             // to the store file.
             writer.append(kv);
           } else if (MobUtils.isMobReferenceCell(kv)) {
-            if (MobUtils.isValidMobRefCellValue(kv)) {
+            if (MobUtils.hasValidMobRefCellValue(kv)) {
               int size = MobUtils.getMobValueLength(kv);
               if (size > mobSizeThreshold) {
                 // If the value size is larger than the threshold, it's regarded as a mob. Since

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
new file mode 100644
index 0000000..d3c11ad
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
@@ -0,0 +1,120 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * The cleaner to delete the expired MOB files.
+ */
+@InterfaceAudience.Private
+public class ExpiredMobFileCleaner extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleaner.class);
+  /**
+   * Cleans the MOB files when they're expired and their min versions are 0.
+   * If the latest timestamp of Cells in a MOB file is older than the TTL in the column family,
+   * it's regarded as expired. This cleaner deletes them.
+   * At a time T0, the cells in a mob file M0 are expired. If a user starts a scan before T0, those
+   * mob cells are visible, this scan still runs after T0. At that time T1, this mob file M0
+   * is expired, meanwhile a cleaner starts, the M0 is archived and can be read in the archive
+   * directory.
+   * @param tableName The current table name.
+   * @param family The current family.
+   * @throws ServiceException
+   * @throws IOException
+   */
+  public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family)
+      throws ServiceException, IOException {
+    Configuration conf = getConf();
+    TableName tn = TableName.valueOf(tableName);
+    FileSystem fs = FileSystem.get(conf);
+    LOG.info("Cleaning the expired MOB files of " + family.getNameAsString() + " in " + tableName);
+    // disable the block cache.
+    Configuration copyOfConf = new Configuration(conf);
+    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
+    MobUtils.cleanExpiredMobFiles(fs, conf, tn, family, cacheConfig,
+        EnvironmentEdgeManager.currentTime());
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    ToolRunner.run(conf, new ExpiredMobFileCleaner(), args);
+  }
+
+  private void printUsage() {
+    System.err.println("Usage:\n" + "--------------------------\n"
+        + ExpiredMobFileCleaner.class.getName() + " tableName familyName");
+    System.err.println(" tableName        The table name");
+    System.err.println(" familyName       The column family name");
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      printUsage();
+      return 1;
+    }
+    String tableName = args[0];
+    String familyName = args[1];
+    TableName tn = TableName.valueOf(tableName);
+    HBaseAdmin.checkHBaseAvailable(getConf());
+    HBaseAdmin admin = new HBaseAdmin(getConf());
+    try {
+      HTableDescriptor htd = admin.getTableDescriptor(tn);
+      HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName));
+      if (family == null || !MobUtils.isMobFamily(family)) {
+        throw new IOException("Column family " + familyName + " is not a MOB column family");
+      }
+      if (family.getMinVersions() > 0) {
+        throw new IOException(
+            "The minVersions of the column family is not 0, could not be handled by this cleaner");
+      }
+      cleanExpiredMobFiles(tableName, family);
+      return 0;
+    } finally {
+      try {
+        admin.close();
+      } catch (IOException e) {
+        LOG.error("Fail to close the HBaseAdmin.", e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
index 9978afd..4e3e7c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -38,6 +38,7 @@ public class MobConstants {
 
   public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw";
   public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks";
+  public static final String MOB_SCAN_REF_ONLY = "hbase.mob.scan.ref.only";
 
   public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size";
   public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000;
@@ -46,6 +47,26 @@ public class MobConstants {
   public static final String MOB_REGION_NAME = ".mob";
   public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME);
 
+  public static final String MOB_CLEANER_PERIOD = "hbase.master.mob.ttl.cleaner.period";
+  public static final int DEFAULT_MOB_CLEANER_PERIOD = 24 * 60 * 60 * 1000; // one day
+
+  public static final String MOB_SWEEP_TOOL_COMPACTION_START_DATE =
+      "hbase.mob.sweep.tool.compaction.start.date";
+  public static final String MOB_SWEEP_TOOL_COMPACTION_RATIO =
+      "hbase.mob.sweep.tool.compaction.ratio";
+  public static final String MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE =
+      "hbase.mob.sweep.tool.compaction.mergeable.size";
+
+  public static final float DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO = 0.5f;
+  public static final long DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE = 128 * 1024 * 1024;
+
+  public static final String MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME = "mobcompaction";
+
+  public static final String MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE =
+      "hbase.mob.sweep.tool.compaction.memstore.flush.size";
+  public static final long DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE =
+      1024 * 1024 * 128; // 128M
+
   public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period";
   public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio";
   public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE,
@@ -55,6 +76,7 @@ public class MobConstants {
   public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
 
   public final static String TEMP_DIR_NAME = ".tmp";
+  public final static String EMPTY_STRING = "";
   private MobConstants() {
 
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index e52d336..e49d3ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -18,13 +18,22 @@
  */
 package org.apache.hadoop.hbase.mob;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.List;
+import java.util.UUID;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -34,7 +43,16 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 
@@ -44,6 +62,9 @@ import org.apache.hadoop.hbase.util.FSUtils;
 @InterfaceAudience.Private
 public class MobUtils {
 
+  private static final Log LOG = LogFactory.getLog(MobUtils.class);
+  private static final String COMPACTION_WORKING_DIR_NAME = "working";
+
   private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
       new ThreadLocal<SimpleDateFormat>() {
     @Override
@@ -143,6 +164,22 @@ public class MobUtils {
   }
 
   /**
+   * Indicates whether it's a reference only scan.
+   * The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
+   * If it's a ref only scan, only the cells with ref tag are returned.
+   * @param scan The current scan.
+   * @return True if it's a ref only scan.
+   */
+  public static boolean isRefOnlyScan(Scan scan) {
+    byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
+    try {
+      return refOnly != null && Bytes.toBoolean(refOnly);
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+  }
+
+  /**
    * Indicates whether the scan contains the information of caching blocks.
    * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
    * @param scan The current scan.
@@ -172,6 +209,91 @@ public class MobUtils {
   }
 
   /**
+   * Cleans the expired mob files.
+   * Cleans the files whose creation date is older than (current - columnFamily.ttl), and
+   * the minVersions of that column family is 0.
+   * @param fs The current file system.
+   * @param conf The current configuration.
+   * @param tableName The current table name.
+   * @param columnDescriptor The descriptor of the current column family.
+   * @param cacheConfig The cacheConfig that disables the block cache.
+   * @param current The current time.
+   * @throws IOException
+   */
+  public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
+      HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
+      throws IOException {
+    long timeToLive = columnDescriptor.getTimeToLive();
+    if (Integer.MAX_VALUE == timeToLive) {
+      // no need to clean, because the TTL is not set.
+      return;
+    }
+
+    Date expireDate = new Date(current - timeToLive * 1000);
+    expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
+    LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
+
+    FileStatus[] stats = null;
+    Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName);
+    Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
+    try {
+      stats = fs.listStatus(path);
+    } catch (FileNotFoundException e) {
+      LOG.warn("Fail to find the mob file " + path, e);
+    }
+    if (null == stats) {
+      // no file found
+      return;
+    }
+    List<StoreFile> filesToClean = new ArrayList<StoreFile>();
+    int deletedFileCount = 0;
+    for (FileStatus file : stats) {
+      String fileName = file.getPath().getName();
+      try {
+        MobFileName mobFileName = null;
+        if (!HFileLink.isHFileLink(file.getPath())) {
+          mobFileName = MobFileName.create(fileName);
+        } else {
+          HFileLink hfileLink = new HFileLink(conf, file.getPath());
+          mobFileName = MobFileName.create(hfileLink.getOriginPath().getName());
+        }
+        Date fileDate = parseDate(mobFileName.getDate());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Checking file " + fileName);
+        }
+        if (fileDate.getTime() < expireDate.getTime()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(fileName + " is an expired file");
+          }
+          filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
+        }
+      } catch (Exception e) {
+        LOG.error("Cannot parse the fileName " + fileName, e);
+      }
+    }
+    if (!filesToClean.isEmpty()) {
+      try {
+        removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
+            filesToClean);
+        deletedFileCount = filesToClean.size();
+      } catch (IOException e) {
+        LOG.error("Fail to delete the mob files " + filesToClean, e);
+      }
+    }
+    LOG.info(deletedFileCount + " expired mob files are deleted");
+  }
+
+  /**
+   * Gets the znode name of column family.
+   * @param tableName The current table name.
+   * @param familyName The name of the current column family.
+   * @return The znode name of column family.
+   */
+  public static String getColumnFamilyZNodeName(String tableName, String familyName) {
+    return tableName + ":" + familyName;
+  }
+
+  /**
    * Gets the root dir of the mob files.
    * It's {HBASE_DIR}/mobdir.
    * @param conf The current configuration.
@@ -183,6 +305,19 @@ public class MobUtils {
   }
 
   /**
+   * Gets the qualified root dir of the mob files.
+   * @param conf The current configuration.
+   * @return The qualified root dir.
+   * @throws IOException
+   */
+  public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
+    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
+    Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
+    FileSystem fs = mobRootDir.getFileSystem(conf);
+    return mobRootDir.makeQualified(fs);
+  }
+
+  /**
    * Gets the region dir of the mob files.
    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
    * @param conf The current configuration.
@@ -190,7 +325,7 @@ public class MobUtils {
    * @return The region dir of the mob files.
    */
   public static Path getMobRegionPath(Configuration conf, TableName tableName) {
-    Path tablePath = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
+    Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName);
     HRegionInfo regionInfo = getMobRegionInfo(tableName);
     return new Path(tablePath, regionInfo.getEncodedName());
   }
@@ -232,36 +367,160 @@ public class MobUtils {
   }
 
   /**
+   * Gets whether the current HRegionInfo is a mob one.
+   * @param regionInfo The current HRegionInfo.
+   * @return If true, the current HRegionInfo is a mob one.
+   */
+  public static boolean isMobRegionInfo(HRegionInfo regionInfo) {
+    return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
+        .equals(regionInfo.getEncodedName());
+  }
+
+  /**
+   * Gets the working directory of the mob compaction.
+   * @param root The root directory of the mob compaction.
+   * @param jobName The current job name.
+   * @return The directory of the mob compaction for the current job.
+   */
+  public static Path getCompactionWorkingPath(Path root, String jobName) {
+    Path parent = new Path(root, jobName);
+    return new Path(parent, COMPACTION_WORKING_DIR_NAME);
+  }
+
+  /**
+   * Archives the mob files.
+   * @param conf The current configuration.
+   * @param fs The current file system.
+   * @param tableName The table name.
+   * @param tableDir The table directory.
+   * @param family The name of the column family.
+   * @param storeFiles The files to be deleted.
+   * @throws IOException
+   */
+  public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
+      Path tableDir, byte[] family, Collection<StoreFile> storeFiles) throws IOException {
+    HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
+        storeFiles);
+  }
+
+  /**
    * Creates a mob reference KeyValue.
    * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
-   * @param kv The original KeyValue.
+   * @param cell The original Cell.
    * @param fileName The mob file name where the mob reference KeyValue is written.
    * @param tableNameTag The tag of the current table name. It's very important in
    *                        cloning the snapshot.
    * @return The mob reference KeyValue.
    */
-  public static KeyValue createMobRefKeyValue(KeyValue kv, byte[] fileName, Tag tableNameTag) {
+  public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) {
     // Append the tags to the KeyValue.
     // The key is same, the value is the filename of the mob file
-    List<Tag> existingTags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
-    existingTags.add(MobConstants.MOB_REF_TAG);
+    List<Tag> tags = new ArrayList<Tag>();
+    // Add the ref tag as the 1st one.
+    tags.add(MobConstants.MOB_REF_TAG);
+    // Add the existing tags.
+    tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
     // Add the tag of the source table name, this table is where this mob file is flushed
     // from.
     // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
     // find the original mob files by this table name. For details please see cloning
     // snapshot for mob files.
-    existingTags.add(tableNameTag);
-    int valueLength = kv.getValueLength();
+    tags.add(tableNameTag);
+    int valueLength = cell.getValueLength();
     byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
-    KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
-        kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
-        kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put,
-        refValue, 0, refValue.length, existingTags);
-    reference.setSequenceId(kv.getSequenceId());
+    KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+        cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+        cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags);
+    reference.setSequenceId(cell.getSequenceId());
     return reference;
   }
 
   /**
+   * Creates a directory of mob files for flushing.
+   * @param conf The current configuration.
+   * @param fs The current file system.
+   * @param family The descriptor of the current column family.
+   * @param date The date string, its format is yyyymmmdd.
+   * @param basePath The basic path for a temp directory.
+   * @param maxKeyCount The key count.
+   * @param compression The compression algorithm.
+   * @param startKey The hex string of the start key.
+   * @param cacheConfig The current cache config.
+   * @return The writer for the mob file.
+   * @throws IOException
+   */
+  public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
+      HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
+      Compression.Algorithm compression, String startKey, CacheConfig cacheConfig)
+      throws IOException {
+    MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
+        .replaceAll("-", ""));
+    HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+        .withIncludesMvcc(false).withIncludesTags(true)
+        .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
+        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
+        .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
+        .withDataBlockEncoding(family.getDataBlockEncoding()).build();
+
+    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
+        .withFilePath(new Path(basePath, mobFileName.getFileName()))
+        .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
+        .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+    return w;
+  }
+
+  /**
+   * Commits the mob file.
+   * @param @param conf The current configuration.
+   * @param fs The current file system.
+   * @param path The path where the mob file is saved.
+   * @param targetPath The directory path where the source file is renamed to.
+   * @param cacheConfig The current cache config.
+   * @throws IOException
+   */
+  public static void commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
+      Path targetPath, CacheConfig cacheConfig) throws IOException {
+    if (sourceFile == null) {
+      return;
+    }
+    Path dstPath = new Path(targetPath, sourceFile.getName());
+    validateMobFile(conf, fs, sourceFile, cacheConfig);
+    String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
+    LOG.info(msg);
+    Path parent = dstPath.getParent();
+    if (!fs.exists(parent)) {
+      fs.mkdirs(parent);
+    }
+    if (!fs.rename(sourceFile, dstPath)) {
+      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
+    }
+  }
+
+  /**
+   * Validates a mob file by opening and closing it.
+   * @param conf The current configuration.
+   * @param fs The current file system.
+   * @param path The path where the mob file is saved.
+   * @param cacheConfig The current cache config.
+   */
+  private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
+      CacheConfig cacheConfig) throws IOException {
+    StoreFile storeFile = null;
+    try {
+      storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
+      storeFile.createReader();
+    } catch (IOException e) {
+      LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
+      throw e;
+    } finally {
+      if (storeFile != null) {
+        storeFile.closeReader(false);
+      }
+    }
+  }
+
+  /**
    * Indicates whether the current mob ref cell has a valid value.
    * A mob ref cell has a mob reference tag.
    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
@@ -270,7 +529,7 @@ public class MobUtils {
    * @param cell The mob ref cell.
    * @return True if the cell has a valid value.
    */
-  public static boolean isValidMobRefCellValue(Cell cell) {
+  public static boolean hasValidMobRefCellValue(Cell cell) {
     return cell.getValueLength() > Bytes.SIZEOF_INT;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
new file mode 100644
index 0000000..a9557d7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The zookeeper used for MOB.
+ * This zookeeper is used to synchronize the HBase major compaction and sweep tool.
+ * The structure of the nodes for mob in zookeeper.
+ * |--baseNode
+ *     |--MOB
+ *         |--tableName:columnFamilyName-lock // locks for the mob column family
+ *         |--tableName:columnFamilyName-sweeper // when a sweep tool runs, such a node is added
+ *         |--tableName:columnFamilyName-majorCompaction
+ *              |--UUID //when a major compaction occurs, such a node is added.
+ * In order to synchronize the operations between the sweep tool and HBase major compaction, these
+ * actions need to acquire the tableName:columnFamilyName-lock before the sweep tool and major
+ * compaction run.
+ * In sweep tool.
+ * 1. If it acquires the lock successfully. It check whether the sweeper node exists, if exist the
+ * current running is aborted. If not it it checks whether there're major compaction nodes, if yes
+ * the current running is aborted, if not it adds a sweep node to the zookeeper.
+ * 2. If it could not obtain the lock, the current running is aborted.
+ * In the HBase compaction.
+ * 1. If it's a minor compaction, continue the compaction.
+ * 2. If it's a major compaction, it acquires a lock in zookeeper.
+ *    A. If it obtains the lock, it checks whether there's sweep node, if yes it converts itself
+ *    to a minor one and continue, if no it adds a major compaction node to the zookeeper.
+ *    B. If it could not obtain the lock, it converts itself to a minor one and continue the
+ *    compaction.
+ */
+@InterfaceAudience.Private
+public class MobZookeeper {
+  // TODO Will remove this class before the mob is merged back to master.
+  private static final Log LOG = LogFactory.getLog(MobZookeeper.class);
+
+  private ZooKeeperWatcher zkw;
+  private String mobZnode;
+  private static final String LOCK_EPHEMERAL = "-lock";
+  private static final String SWEEPER_EPHEMERAL = "-sweeper";
+  private static final String MAJOR_COMPACTION_EPHEMERAL = "-majorCompaction";
+
+  private MobZookeeper(Configuration conf, String identifier) throws IOException,
+      KeeperException {
+    this.zkw = new ZooKeeperWatcher(conf, identifier, new DummyMobAbortable());
+    mobZnode = ZKUtil.joinZNode(zkw.baseZNode, "MOB");
+    if (ZKUtil.checkExists(zkw, mobZnode) == -1) {
+      ZKUtil.createWithParents(zkw, mobZnode);
+    }
+  }
+
+  /**
+   * Creates an new instance of MobZookeeper.
+   * @param conf The current configuration.
+   * @param identifier string that is passed to RecoverableZookeeper to be used as
+   * identifier for this instance.
+   * @return A new instance of MobZookeeper.
+   * @throws IOException
+   * @throws KeeperException
+   */
+  public static MobZookeeper newInstance(Configuration conf, String identifier) throws IOException,
+      KeeperException {
+    return new MobZookeeper(conf, identifier);
+  }
+
+  /**
+   * Acquire a lock on the current column family.
+   * All the threads try to access the column family acquire a lock which is actually create an
+   * ephemeral node in the zookeeper.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   * @return True if the lock is obtained successfully. Otherwise false is returned.
+   */
+  public boolean lockColumnFamily(String tableName, String familyName) {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    boolean locked = false;
+    try {
+      locked = ZKUtil.createEphemeralNodeAndWatch(zkw,
+          ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL), null);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(locked ? "Locked the column family " + znodeName
+            : "Can not lock the column family " + znodeName);
+      }
+    } catch (KeeperException e) {
+      LOG.error("Fail to lock the column family " + znodeName, e);
+    }
+    return locked;
+  }
+
+  /**
+   * Release the lock on the current column family.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   */
+  public void unlockColumnFamily(String tableName, String familyName) {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Unlocking the column family " + znodeName);
+    }
+    try {
+      ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL));
+    } catch (KeeperException e) {
+      LOG.warn("Fail to unlock the column family " + znodeName, e);
+    }
+  }
+
+  /**
+   * Adds a node to zookeeper which indicates that a sweep tool is running.
+   * @param tableName The current table name.
+   * @param familyName The current columnFamilyName name.
+   * @param data the data of the ephemeral node.
+   * @return True if the node is created successfully. Otherwise false is returned.
+   */
+  public boolean addSweeperZNode(String tableName, String familyName, byte[] data) {
+    boolean add = false;
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    try {
+      add = ZKUtil.createEphemeralNodeAndWatch(zkw,
+          ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL), data);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(add ? "Added a znode for sweeper " + znodeName
+            : "Cannot add a znode for sweeper " + znodeName);
+      }
+    } catch (KeeperException e) {
+      LOG.error("Fail to add a znode for sweeper " + znodeName, e);
+    }
+    return add;
+  }
+
+  /**
+   * Gets the path of the sweeper znode in zookeeper.
+   * @param tableName The current table name.
+   * @param familyName The current columnFamilyName name.
+   * @return The path of the sweeper znode in zookeper.
+   */
+  public String getSweeperZNodePath(String tableName, String familyName) {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    return ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL);
+  }
+
+  /**
+   * Deletes the node from zookeeper which indicates that a sweep tool is finished.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   */
+  public void deleteSweeperZNode(String tableName, String familyName) {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    try {
+      ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL));
+    } catch (KeeperException e) {
+      LOG.error("Fail to delete a znode for sweeper " + znodeName, e);
+    }
+  }
+
+  /**
+   * Checks whether the znode exists in the Zookeeper.
+   * If the node exists, it means a sweep tool is running.
+   * Otherwise, the sweep tool is not.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   * @return True if this node doesn't exist. Otherwise false is returned.
+   * @throws KeeperException
+   */
+  public boolean isSweeperZNodeExist(String tableName, String familyName) throws KeeperException {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    return ZKUtil.checkExists(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)) >= 0;
+  }
+
+  /**
+   * Checks whether there're major compactions nodes in the zookeeper.
+   * If there're such nodes, it means there're major compactions in progress now.
+   * Otherwise there're not.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   * @return True if there're major compactions in progress. Otherwise false is returned.
+   * @throws KeeperException
+   */
+  public boolean hasMajorCompactionChildren(String tableName, String familyName)
+      throws KeeperException {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+    List<String> children = ZKUtil.listChildrenNoWatch(zkw, mcPath);
+    return children != null && !children.isEmpty();
+  }
+
+  /**
+   * Creates a node of a major compaction to the Zookeeper.
+   * Before a HBase major compaction, such a node is created to the Zookeeper. It tells others that
+   * there're major compaction in progress, the sweep tool could not be run at this time.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   * @param compactionName The current compaction name.
+   * @return True if the node is created successfully. Otherwise false is returned.
+   * @throws KeeperException
+   */
+  public boolean addMajorCompactionZNode(String tableName, String familyName,
+      String compactionName) throws KeeperException {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+    ZKUtil.createNodeIfNotExistsAndWatch(zkw, mcPath, null);
+    String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
+    return ZKUtil.createEphemeralNodeAndWatch(zkw, eachMcPath, null);
+  }
+
+  /**
+   * Deletes a major compaction node from the Zookeeper.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   * @param compactionName The current compaction name.
+   * @throws KeeperException
+   */
+  public void deleteMajorCompactionZNode(String tableName, String familyName,
+      String compactionName) throws KeeperException {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+    String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
+    ZKUtil.deleteNode(zkw, eachMcPath);
+  }
+
+  /**
+   * Closes the MobZookeeper.
+   */
+  public void close() {
+    this.zkw.close();
+  }
+
+  /**
+   * An dummy abortable. It's used for the MobZookeeper.
+   */
+  public static class DummyMobAbortable implements Abortable {
+
+    private boolean abort = false;
+
+    public void abort(String why, Throwable e) {
+      abort = true;
+    }
+
+    public boolean isAborted() {
+      return abort;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
new file mode 100644
index 0000000..b0d4c9d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
@@ -0,0 +1,184 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepReducer.SweepPartitionId;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MemStore;
+import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+
+/**
+ * The wrapper of a DefaultMemStore.
+ * This wrapper is used in the sweep reducer to buffer and sort the cells written from
+ * the invalid and small mob files.
+ * It's flushed when it's full, the mob data are written to the mob files, and their file names
+ * are written back to store files of HBase.
+ * This memStore is used to sort the cells in mob files.
+ * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date),
+ * in each group, the reducer iterates the files and read the cells to a new and bigger mob file.
+ * The cells in the same mob file are ordered, but cells across mob files are not.
+ * So we need this MemStoreWrapper to sort those cells come from different mob files before
+ * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file.
+ */
+@InterfaceAudience.Private
+public class MemStoreWrapper {
+
+  private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
+
+  private MemStore memstore;
+  private long flushSize;
+  private SweepPartitionId partitionId;
+  private Context context;
+  private Configuration conf;
+  private HTable table;
+  private HColumnDescriptor hcd;
+  private Path mobFamilyDir;
+  private FileSystem fs;
+  private CacheConfig cacheConfig;
+
+  public MemStoreWrapper(Context context, FileSystem fs, HTable table, HColumnDescriptor hcd,
+      MemStore memstore, CacheConfig cacheConfig) throws IOException {
+    this.memstore = memstore;
+    this.context = context;
+    this.fs = fs;
+    this.table = table;
+    this.hcd = hcd;
+    this.conf = context.getConfiguration();
+    this.cacheConfig = cacheConfig;
+    flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
+        MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
+    mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
+  }
+
+  public void setPartitionId(SweepPartitionId partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  /**
+   * Flushes the memstore if the size is large enough.
+   * @throws IOException
+   */
+  private void flushMemStoreIfNecessary() throws IOException {
+    if (memstore.heapSize() >= flushSize) {
+      flushMemStore();
+    }
+  }
+
+  /**
+   * Flushes the memstore anyway.
+   * @throws IOException
+   */
+  public void flushMemStore() throws IOException {
+    MemStoreSnapshot snapshot = memstore.snapshot();
+    internalFlushCache(snapshot);
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
+  /**
+   * Flushes the snapshot of the memstore.
+   * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase.
+   * @param snapshot The snapshot of the memstore.
+   * @throws IOException
+   */
+  private void internalFlushCache(final MemStoreSnapshot snapshot)
+      throws IOException {
+    if (snapshot.getSize() == 0) {
+      return;
+    }
+    // generate the files into a temp directory.
+    String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
+    StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd,
+        partitionId.getDate(), new Path(tempPathString), snapshot.getCellsCount(),
+        hcd.getCompactionCompression(), partitionId.getStartKey(), cacheConfig);
+
+    String relativePath = mobFileWriter.getPath().getName();
+    LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
+
+    byte[] referenceValue = Bytes.toBytes(relativePath);
+    int keyValueCount = 0;
+    KeyValueScanner scanner = snapshot.getScanner();
+    Cell cell = null;
+    while (null != (cell = scanner.next())) {
+      KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+      mobFileWriter.append(kv);
+      keyValueCount++;
+    }
+    scanner.close();
+    // Write out the log sequence number that corresponds to this output
+    // hfile. The hfile is current up to and including logCacheFlushId.
+    mobFileWriter.appendMetadata(Long.MAX_VALUE, false);
+    mobFileWriter.close();
+
+    MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
+    context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
+    // write reference/fileName back to the store files of HBase.
+    scanner = snapshot.getScanner();
+    scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
+    cell = null;
+    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, this.table.getTableName());
+    while (null != (cell = scanner.next())) {
+      KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag);
+      Put put =
+          new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
+      put.add(reference);
+      table.put(put);
+      context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1);
+    }
+    if (keyValueCount > 0) {
+      table.flushCommits();
+    }
+    scanner.close();
+  }
+
+  /**
+   * Adds a KeyValue into the memstore.
+   * @param kv The KeyValue to be added.
+   * @throws IOException
+   */
+  public void addToMemstore(KeyValue kv) throws IOException {
+    memstore.add(kv);
+    // flush the memstore if it's full.
+    flushMemStoreIfNecessary();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
new file mode 100644
index 0000000..bdec887
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * The partitioner for the sweep job.
+ * The key is a mob file name. We bucket by date.
+ */
+@InterfaceAudience.Private
+public class MobFilePathHashPartitioner extends Partitioner<Text, KeyValue> {
+
+  @Override
+  public int getPartition(Text fileName, KeyValue kv, int numPartitions) {
+    MobFileName mobFileName = MobFileName.create(fileName.toString());
+    String date = mobFileName.getDate();
+    int hash = date.hashCode();
+    return (hash & Integer.MAX_VALUE) % numPartitions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
new file mode 100644
index 0000000..1d048bb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
@@ -0,0 +1,550 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The sweep job.
+ * Run map reduce to merge the smaller mob files into bigger ones and cleans the unused ones.
+ */
+@InterfaceAudience.Private
+public class SweepJob {
+
+  private final FileSystem fs;
+  private final Configuration conf;
+  private static final Log LOG = LogFactory.getLog(SweepJob.class);
+  static final String SWEEP_JOB_ID = "mob.compaction.id";
+  static final String SWEEPER_NODE = "mob.compaction.sweep.node";
+  static final String WORKING_DIR_KEY = "mob.compaction.dir";
+  static final String WORKING_ALLNAMES_FILE_KEY = "mob.compaction.all.file";
+  static final String WORKING_VISITED_DIR_KEY = "mob.compaction.visited.dir";
+  static final String WORKING_ALLNAMES_DIR = "all";
+  static final String WORKING_VISITED_DIR = "visited";
+  public static final String WORKING_FILES_DIR_KEY = "mob.compaction.files.dir";
+  //the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing.
+  public static final String MOB_COMPACTION_DELAY = "hbase.mob.compaction.delay";
+  protected static long ONE_DAY = 24 * 60 * 60 * 1000;
+  private long compactionStartTime = EnvironmentEdgeManager.currentTime();
+  public final static String CREDENTIALS_LOCATION = "credentials_location";
+  private CacheConfig cacheConfig;
+  static final int SCAN_CACHING = 10000;
+
+  public SweepJob(Configuration conf, FileSystem fs) {
+    this.conf = conf;
+    this.fs = fs;
+    // disable the block cache.
+    Configuration copyOfConf = new Configuration(conf);
+    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+    cacheConfig = new CacheConfig(copyOfConf);
+  }
+
+  /**
+   * Runs MapReduce to do the sweeping on the mob files.
+   * There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob
+   * references from 'normal' regions' rows.
+   * The running of the sweep tool on the same column family are mutually exclusive.
+   * The HBase major compaction and running of the sweep tool on the same column family
+   * are mutually exclusive.
+   * The synchronization is done by the Zookeeper.
+   * So in the beginning of the running, we need to make sure only this sweep tool is the only one
+   * that is currently running in this column family, and in this column family there're no major
+   * compaction in progress.
+   * @param tn The current table name.
+   * @param family The descriptor of the current column family.
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public void sweep(TableName tn, HColumnDescriptor family) throws IOException,
+      ClassNotFoundException, InterruptedException, KeeperException {
+    Configuration conf = new Configuration(this.conf);
+    // check whether the current user is the same one with the owner of hbase root
+    String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName();
+    FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR)));
+    if (hbaseRootFileStat.length > 0) {
+      String owner = hbaseRootFileStat[0].getOwner();
+      if (!owner.equals(currentUserName)) {
+        String errorMsg = "The current user[" + currentUserName
+            + "] doesn't have hbase root credentials."
+            + " Please make sure the user is the root of the target HBase";
+        LOG.error(errorMsg);
+        throw new IOException(errorMsg);
+      }
+    } else {
+      LOG.error("The target HBase doesn't exist");
+      throw new IOException("The target HBase doesn't exist");
+    }
+    String familyName = family.getNameAsString();
+    String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
+    MobZookeeper zk = MobZookeeper.newInstance(conf, id);
+    try {
+      // Try to obtain the lock. Use this lock to synchronize all the query, creation/deletion
+      // in the Zookeeper.
+      if (!zk.lockColumnFamily(tn.getNameAsString(), familyName)) {
+        LOG.warn("Can not lock the store " + familyName
+            + ". The major compaction in HBase may be in-progress. Please re-run the job.");
+        return;
+      }
+      try {
+        // Checks whether there're HBase major compaction now.
+        boolean hasChildren = zk.hasMajorCompactionChildren(tn.getNameAsString(), familyName);
+        if (hasChildren) {
+          LOG.warn("The major compaction in HBase may be in-progress."
+              + " Please re-run the job.");
+          return;
+        } else {
+          // Checks whether there's sweep tool in progress.
+          boolean hasSweeper = zk.isSweeperZNodeExist(tn.getNameAsString(), familyName);
+          if (hasSweeper) {
+            LOG.warn("Another sweep job is running");
+            return;
+          } else {
+            // add the sweeper node, mark that there's one sweep tool in progress.
+            // All the HBase major compaction and sweep tool in this column family could not
+            // run until this sweep tool is finished.
+            zk.addSweeperZNode(tn.getNameAsString(), familyName, Bytes.toBytes(id));
+          }
+        }
+      } finally {
+        zk.unlockColumnFamily(tn.getNameAsString(), familyName);
+      }
+      Job job = null;
+      try {
+        Scan scan = new Scan();
+        scan.addFamily(family.getName());
+        // Do not retrieve the mob data when scanning
+        scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+        scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+        scan.setCaching(SCAN_CACHING);
+        scan.setCacheBlocks(false);
+        scan.setMaxVersions(family.getMaxVersions());
+        conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+            JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
+        conf.set(SWEEP_JOB_ID, id);
+        conf.set(SWEEPER_NODE, zk.getSweeperZNodePath(tn.getNameAsString(), familyName));
+        job = prepareJob(tn, familyName, scan, conf);
+        job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);
+        // Record the compaction start time.
+        // In the sweep tool, only the mob file whose modification time is older than
+        // (startTime - delay) could be handled by this tool.
+        // The delay is one day. It could be configured as well, but this is only used
+        // in the test.
+        job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
+            compactionStartTime);
+
+        job.setPartitionerClass(MobFilePathHashPartitioner.class);
+        submit(job, tn, familyName);
+        if (job.waitForCompletion(true)) {
+          // Archive the unused mob files.
+          removeUnusedFiles(job, tn, family);
+        }
+      } finally {
+        cleanup(job, tn, familyName);
+        zk.deleteSweeperZNode(tn.getNameAsString(), familyName);
+      }
+    } finally {
+      zk.close();
+    }
+  }
+
+  /**
+   * Prepares a map reduce job.
+   * @param tn The current table name.
+   * @param familyName The current family name.
+   * @param scan The current scan.
+   * @param conf The current configuration.
+   * @return A map reduce job.
+   * @throws IOException
+   */
+  private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf)
+      throws IOException {
+    Job job = Job.getInstance(conf);
+    job.setJarByClass(SweepMapper.class);
+    TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan,
+        SweepMapper.class, Text.class, Writable.class, job);
+
+    job.setInputFormatClass(TableInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(KeyValue.class);
+    job.setReducerClass(SweepReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    String jobName = getCustomJobName(this.getClass().getSimpleName(), tn.getNameAsString(),
+        familyName);
+    job.setJobName(jobName);
+    if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
+      String fileLoc = conf.get(CREDENTIALS_LOCATION);
+      Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
+      job.getCredentials().addAll(cred);
+    }
+    return job;
+  }
+
+  /**
+   * Gets a customized job name.
+   * It's className-mapperClassName-reducerClassName-tableName-familyName.
+   * @param className The current class name.
+   * @param tableName The current table name.
+   * @param familyName The current family name.
+   * @return The customized job name.
+   */
+  private static String getCustomJobName(String className, String tableName, String familyName) {
+    StringBuilder name = new StringBuilder();
+    name.append(className);
+    name.append('-').append(SweepMapper.class.getSimpleName());
+    name.append('-').append(SweepReducer.class.getSimpleName());
+    name.append('-').append(tableName);
+    name.append('-').append(familyName);
+    return name.toString();
+  }
+
+  /**
+   * Submits a job.
+   * @param job The current job.
+   * @param tn The current table name.
+   * @param familyName The current family name.
+   * @throws IOException
+   */
+  private void submit(Job job, TableName tn, String familyName) throws IOException {
+    // delete the temp directory of the mob files in case the failure in the previous
+    // execution.
+    Path tempDir =
+        new Path(MobUtils.getMobHome(job.getConfiguration()), MobConstants.TEMP_DIR_NAME);
+    Path mobCompactionTempDir =
+        new Path(tempDir, MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME);
+    Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName());
+    job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString());
+    // delete the working directory in case it'not deleted by the last running.
+    fs.delete(workingPath, true);
+    // create the working directory.
+    fs.mkdirs(workingPath);
+    // create a sequence file which contains the names of all the existing files.
+    Path workingPathOfFiles = new Path(workingPath, "files");
+    Path workingPathOfNames = new Path(workingPath, "names");
+    job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString());
+    Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR);
+    job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString());
+    Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR);
+    job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString());
+    // create a file includes all the existing mob files whose creation time is older than
+    // (now - oneDay)
+    fs.create(allFileNamesPath, true);
+    // create a directory where the files contain names of visited mob files are saved.
+    fs.mkdirs(vistiedFileNamesPath);
+    Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName);
+    // Find all the files whose creation time are older than one day.
+    // Write those file names to a file.
+    // In each reducer there's a writer, it write the visited file names to a file which is saved
+    // in WORKING_VISITED_DIR.
+    // After the job is finished, compare those files, then find out the unused mob files and
+    // archive them.
+    FileStatus[] files = fs.listStatus(mobStorePath);
+    Set<String> fileNames = new TreeSet<String>();
+    long mobCompactionDelay = job.getConfiguration().getLong(MOB_COMPACTION_DELAY, ONE_DAY);
+    for (FileStatus fileStatus : files) {
+      if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) {
+        if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) {
+          // only record the potentially unused files older than one day.
+          fileNames.add(fileStatus.getPath().getName());
+        }
+      }
+    }
+    // write the names to a sequence file
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, job.getConfiguration(),
+        allFileNamesPath, String.class, String.class);
+    try {
+      for (String fileName : fileNames) {
+        writer.append(fileName, MobConstants.EMPTY_STRING);
+      }
+    } finally {
+      IOUtils.closeStream(writer);
+    }
+  }
+
+  /**
+   * Gets the unused mob files.
+   * Compare the file which contains all the existing mob files and the visited files,
+   * find out the unused mob file and archive them.
+   * @param conf The current configuration.
+   * @return The unused mob files.
+   * @throws IOException
+   */
+  List<String> getUnusedFiles(Configuration conf) throws IOException {
+    // find out the unused files and archive them
+    Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY));
+    SequenceFile.Reader allNamesReader = null;
+    MergeSortReader visitedNamesReader = null;
+    List<String> toBeArchived = new ArrayList<String>();
+    try {
+      allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf);
+      visitedNamesReader = new MergeSortReader(fs, conf,
+          new Path(conf.get(WORKING_VISITED_DIR_KEY)));
+      String nextAll = (String) allNamesReader.next((String) null);
+      String nextVisited = visitedNamesReader.next();
+      do {
+        if (nextAll != null) {
+          if (nextVisited != null) {
+            int compare = nextAll.compareTo(nextVisited);
+            if (compare < 0) {
+              toBeArchived.add(nextAll);
+              nextAll = (String) allNamesReader.next((String) null);
+            } else if (compare > 0) {
+              nextVisited = visitedNamesReader.next();
+            } else {
+              nextAll = (String) allNamesReader.next((String) null);
+              nextVisited = visitedNamesReader.next();
+            }
+          } else {
+            toBeArchived.add(nextAll);
+            nextAll = (String) allNamesReader.next((String) null);
+          }
+        } else {
+          break;
+        }
+      } while (nextAll != null || nextVisited != null);
+    } finally {
+      if (allNamesReader != null) {
+        allNamesReader.close();
+      }
+      if (visitedNamesReader != null) {
+        visitedNamesReader.close();
+      }
+    }
+    return toBeArchived;
+  }
+
+  /**
+   * Archives unused mob files.
+   * @param job The current job.
+   * @param tn The current table name.
+   * @param hcd The descriptor of the current column family.
+   * @throws IOException
+   */
+  private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException {
+    // find out the unused files and archive them
+    List<StoreFile> storeFiles = new ArrayList<StoreFile>();
+    List<String> toBeArchived = getUnusedFiles(job.getConfiguration());
+    // archive them
+    Path mobStorePath = MobUtils
+        .getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString());
+    for (String archiveFileName : toBeArchived) {
+      Path path = new Path(mobStorePath, archiveFileName);
+      storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), cacheConfig, BloomType.NONE));
+    }
+    if (!storeFiles.isEmpty()) {
+      try {
+        MobUtils.removeMobFiles(job.getConfiguration(), fs, tn,
+            FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles);
+        LOG.info(storeFiles.size() + " unused MOB files are removed");
+      } catch (Exception e) {
+        LOG.error("Fail to archive the store files " + storeFiles, e);
+      }
+    }
+  }
+
+  /**
+   * Deletes the working directory.
+   * @param job The current job.
+   * @param store The current MobFileStore.
+   * @throws IOException
+   */
+  private void cleanup(Job job, TableName tn, String familyName) throws IOException {
+    if (job != null) {
+      // delete the working directory
+      Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY));
+      try {
+        fs.delete(workingPath, true);
+      } catch (IOException e) {
+        LOG.warn("Fail to delete the working directory after sweeping store " + familyName
+            + " in the table " + tn.getNameAsString(), e);
+      }
+    }
+  }
+
+  /**
+   * A result with index.
+   */
+  private class IndexedResult implements Comparable<IndexedResult> {
+    private int index;
+    private String value;
+
+    public IndexedResult(int index, String value) {
+      this.index = index;
+      this.value = value;
+    }
+
+    public int getIndex() {
+      return this.index;
+    }
+
+    public String getValue() {
+      return this.value;
+    }
+
+    @Override
+    public int compareTo(IndexedResult o) {
+      if (this.value == null) {
+        return 0;
+      } else if (o.value == null) {
+        return 1;
+      } else {
+        return this.value.compareTo(o.value);
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof IndexedResult)) {
+        return false;
+      }
+      return compareTo((IndexedResult) obj) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+      return value.hashCode();
+    }
+  }
+
+  /**
+   * Merge sort reader.
+   * It merges and sort the readers in different sequence files as one where
+   * the results are read in order.
+   */
+  private class MergeSortReader {
+
+    private List<SequenceFile.Reader> readers = new ArrayList<SequenceFile.Reader>();
+    private PriorityQueue<IndexedResult> results = new PriorityQueue<IndexedResult>();
+
+    public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException {
+      if (fs.exists(path)) {
+        FileStatus[] files = fs.listStatus(path);
+        int index = 0;
+        for (FileStatus file : files) {
+          if (file.isFile()) {
+            SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
+            String key = (String) reader.next((String) null);
+            if (key != null) {
+              results.add(new IndexedResult(index, key));
+              readers.add(reader);
+              index++;
+            }
+          }
+        }
+      }
+    }
+
+    public String next() throws IOException {
+      IndexedResult result = results.poll();
+      if (result != null) {
+        SequenceFile.Reader reader = readers.get(result.getIndex());
+        String key = (String) reader.next((String) null);
+        if (key != null) {
+          results.add(new IndexedResult(result.getIndex(), key));
+        }
+        return result.getValue();
+      }
+      return null;
+    }
+
+    public void close() {
+      for (SequenceFile.Reader reader : readers) {
+        IOUtils.closeStream(reader);
+      }
+    }
+  }
+
+  /**
+   * The counter used in sweep job.
+   */
+  public enum SweepCounter {
+
+    /**
+     * How many files are read.
+     */
+    INPUT_FILE_COUNT,
+
+    /**
+     * How many files need to be merged or cleaned.
+     */
+    FILE_TO_BE_MERGE_OR_CLEAN,
+
+    /**
+     * How many files are left after merging.
+     */
+    FILE_AFTER_MERGE_OR_CLEAN,
+
+    /**
+     * How many records are updated.
+     */
+    RECORDS_UPDATED,
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
new file mode 100644
index 0000000..b789332
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracker on the sweep tool node in zookeeper.
+ * The sweep tool node is an ephemeral one, when the process dies this node is deleted,
+ * at that time MR might be still running, and if another sweep job is started, two MR
+ * for the same column family will run at the same time.
+ * This tracker watches this ephemeral node, if it's gone or it's not created by the
+ * sweep job that owns the current MR, the current process will be aborted.
+ */
+@InterfaceAudience.Private
+public class SweepJobNodeTracker extends ZooKeeperListener {
+
+  private String node;
+  private String sweepJobId;
+
+  public SweepJobNodeTracker(ZooKeeperWatcher watcher, String node, String sweepJobId) {
+    super(watcher);
+    this.node = node;
+    this.sweepJobId = sweepJobId;
+  }
+
+  /**
+   * Registers the watcher on the sweep job node.
+   * If there's no such a sweep job node, or it's not created by the sweep job that
+   * owns the current MR, the current process will be aborted.
+   */
+  public void start() throws KeeperException {
+    watcher.registerListener(this);
+    if (ZKUtil.watchAndCheckExists(watcher, node)) {
+      byte[] data = ZKUtil.getDataAndWatch(watcher, node);
+      if (data != null) {
+        if (!sweepJobId.equals(Bytes.toString(data))) {
+          System.exit(1);
+        }
+      }
+    } else {
+      System.exit(1);
+    }
+  }
+
+  @Override
+  public void nodeDeleted(String path) {
+    // If the ephemeral node is deleted, abort the current process.
+    if (node.equals(path)) {
+      System.exit(1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
new file mode 100644
index 0000000..f508b93
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The mapper of a sweep job.
+ * Takes the rows from the table and their results and map to <filename:Text, mobValue:KeyValue>
+ * where mobValue is the actual cell in HBase.
+ */
+@InterfaceAudience.Private
+public class SweepMapper extends TableMapper<Text, KeyValue> {
+
+  private ZooKeeperWatcher zkw = null;
+
+  @Override
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
+    String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
+    String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
+    zkw = new ZooKeeperWatcher(context.getConfiguration(), id,
+        new DummyMobAbortable());
+    try {
+      SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, id);
+      tracker.start();
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+    if (zkw != null) {
+      zkw.close();
+    }
+  }
+
+  @Override
+  public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException,
+      InterruptedException {
+    if (columns == null) {
+      return;
+    }
+    KeyValue[] kvList = columns.raw();
+    if (kvList == null || kvList.length == 0) {
+      return;
+    }
+    for (KeyValue kv : kvList) {
+      if (MobUtils.hasValidMobRefCellValue(kv)) {
+        String fileName = MobUtils.getMobFileName(kv);
+        context.write(new Text(fileName), kv);
+      }
+    }
+  }
+}