You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/09/19 12:11:01 UTC

[1/2] HBASE-11644 External MOB compaction tools (Jingcheng Du)

Repository: hbase
Updated Branches:
  refs/heads/hbase-11339 9cf46dcbe -> 84e957c87


http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
new file mode 100644
index 0000000..04fe359
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
@@ -0,0 +1,506 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.InvalidFamilyOperationException;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFile;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The reducer of a sweep job.
+ * This reducer merges the small mob files into bigger ones, and write visited
+ * names of mob files to a sequence file which is used by the sweep job to delete
+ * the unused mob files.
+ * The key of the input is a file name, the value is a collection of KeyValue where
+ * the KeyValue is the actual cell (its format is valueLength + fileName) in HBase.
+ * In this reducer, we could know how many cells exist in HBase for a mob file.
+ * If the existCellSize/mobFileSize < compactionRatio, this mob
+ * file needs to be merged.
+ */
+@InterfaceAudience.Private
+public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
+
+  private static final Log LOG = LogFactory.getLog(SweepReducer.class);
+
+  private SequenceFile.Writer writer = null;
+  private MemStoreWrapper memstore;
+  private Configuration conf;
+  private FileSystem fs;
+
+  private Path familyDir;
+  private CacheConfig cacheConfig;
+  private long compactionBegin;
+  private HTable table;
+  private HColumnDescriptor family;
+  private long mobCompactionDelay;
+  private Path mobTableDir;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    this.conf = context.getConfiguration();
+    this.fs = FileSystem.get(conf);
+    // the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing.
+    mobCompactionDelay = conf.getLong(SweepJob.MOB_COMPACTION_DELAY, SweepJob.ONE_DAY);
+    String tableName = conf.get(TableInputFormat.INPUT_TABLE);
+    String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
+    TableName tn = TableName.valueOf(tableName);
+    this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName);
+    HBaseAdmin admin = new HBaseAdmin(this.conf);
+    try {
+      family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName));
+      if (family == null) {
+        // this column family might be removed, directly return.
+        throw new InvalidFamilyOperationException("Column family '" + familyName
+            + "' does not exist. It might be removed.");
+      }
+    } finally {
+      try {
+        admin.close();
+      } catch (IOException e) {
+        LOG.warn("Fail to close the HBaseAdmin", e);
+      }
+    }
+    // disable the block cache.
+    Configuration copyOfConf = new Configuration(conf);
+    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.00001f);
+    this.cacheConfig = new CacheConfig(copyOfConf);
+
+    table = new HTable(this.conf, Bytes.toBytes(tableName));
+    table.setAutoFlush(false, false);
+
+    table.setWriteBufferSize(1 * 1024 * 1024); // 1MB
+    memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig);
+
+    // The start time of the sweep tool.
+    // Only the mob files whose creation time is older than startTime-oneDay will be handled by the
+    // reducer since it brings inconsistency to handle the latest mob files.
+    this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0);
+    mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
+  }
+
+  private SweepPartition createPartition(SweepPartitionId id, Context context) throws IOException {
+    return new SweepPartition(id, context);
+  }
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
+    String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
+        new DummyMobAbortable());
+    try {
+      SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, jobId);
+      tracker.start();
+      setup(context);
+      // create a sequence contains all the visited file names in this reducer.
+      String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
+      Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
+          .replace("-", MobConstants.EMPTY_STRING));
+      if (!fs.exists(nameFilePath)) {
+        fs.create(nameFilePath, true);
+      }
+      writer = SequenceFile.createWriter(fs, context.getConfiguration(), nameFilePath,
+          String.class, String.class);
+      SweepPartitionId id;
+      SweepPartition partition = null;
+      // the mob files which have the same start key and date are in the same partition.
+      while (context.nextKey()) {
+        Text key = context.getCurrentKey();
+        String keyString = key.toString();
+        id = SweepPartitionId.create(keyString);
+        if (null == partition || !id.equals(partition.getId())) {
+          // It's the first mob file in the current partition.
+          if (null != partition) {
+            // this mob file is in different partitions with the previous mob file.
+            // directly close.
+            partition.close();
+          }
+          // create a new one
+          partition = createPartition(id, context);
+        }
+        if (partition != null) {
+          // run the partition
+          partition.execute(key, context.getValues());
+        }
+      }
+      if (null != partition) {
+        partition.close();
+      }
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } finally {
+      cleanup(context);
+      zkw.close();
+      if (writer != null) {
+        IOUtils.closeStream(writer);
+      }
+      if (table != null) {
+        try {
+          table.close();
+        } catch (IOException e) {
+          LOG.warn(e);
+        }
+      }
+    }
+
+  }
+
+  /**
+   * The mob files which have the same start key and date are in the same partition.
+   * The files in the same partition are merged together into bigger ones.
+   */
+  public class SweepPartition {
+
+    private final SweepPartitionId id;
+    private final Context context;
+    private boolean memstoreUpdated = false;
+    private boolean mergeSmall = false;
+    private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>();
+    private final List<Path> toBeDeleted = new ArrayList<Path>();
+
+    public SweepPartition(SweepPartitionId id, Context context) throws IOException {
+      this.id = id;
+      this.context = context;
+      memstore.setPartitionId(id);
+      init();
+    }
+
+    public SweepPartitionId getId() {
+      return this.id;
+    }
+
+    /**
+     * Prepares the map of files.
+     *
+     * @throws IOException
+     */
+    private void init() throws IOException {
+      FileStatus[] fileStats = listStatus(familyDir, id.getStartKey());
+      if (null == fileStats) {
+        return;
+      }
+
+      int smallFileCount = 0;
+      float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO,
+          MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO);
+      long compactionMergeableSize = conf.getLong(
+          MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE,
+          MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE);
+      // list the files. Just merge the hfiles, don't merge the hfile links.
+      // prepare the map of mob files. The key is the file name, the value is the file status.
+      for (FileStatus fileStat : fileStats) {
+        MobFileStatus mobFileStatus = null;
+        if (!HFileLink.isHFileLink(fileStat.getPath())) {
+          mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize);
+          if (mobFileStatus.needMerge()) {
+            smallFileCount++;
+          }
+          // key is file name (not hfile name), value is hfile status.
+          fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus);
+        }
+      }
+      if (smallFileCount >= 2) {
+        // merge the files only when there're more than 1 files in the same partition.
+        this.mergeSmall = true;
+      }
+    }
+
+    /**
+     * Flushes the data into mob files and store files, and archives the small
+     * files after they're merged.
+     * @throws IOException
+     */
+    public void close() throws IOException {
+      if (null == id) {
+        return;
+      }
+      // flush remain key values into mob files
+      if (memstoreUpdated) {
+        memstore.flushMemStore();
+      }
+      List<StoreFile> storeFiles = new ArrayList<StoreFile>(toBeDeleted.size());
+      // delete samll files after compaction
+      for (Path path : toBeDeleted) {
+        LOG.info("[In Partition close] Delete the file " + path + " in partition close");
+        storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE));
+      }
+      if (!storeFiles.isEmpty()) {
+        try {
+          MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(),
+              storeFiles);
+          context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
+        } catch (IOException e) {
+          LOG.error("Fail to archive the store files " + storeFiles, e);
+        }
+        storeFiles.clear();
+      }
+      fileStatusMap.clear();
+    }
+
+    /**
+     * Merges the small mob files into bigger ones.
+     * @param fileName The current mob file name.
+     * @param values The collection of KeyValues in this mob file.
+     * @throws IOException
+     */
+    public void execute(Text fileName, Iterable<KeyValue> values) throws IOException {
+      if (null == values) {
+        return;
+      }
+      MobFileName mobFileName = MobFileName.create(fileName.toString());
+      LOG.info("[In reducer] The file name: " + fileName.toString());
+      MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName());
+      if (null == mobFileStat) {
+        LOG.info("[In reducer] Cannot find the file, probably this record is obsolete");
+        return;
+      }
+      // only handle the files that are older then one day.
+      if (compactionBegin - mobFileStat.getFileStatus().getModificationTime()
+          <= mobCompactionDelay) {
+        return;
+      }
+      // write the hfile name
+      writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING);
+      Set<KeyValue> kvs = new HashSet<KeyValue>();
+      for (KeyValue kv : values) {
+        if (kv.getValueLength() > Bytes.SIZEOF_INT) {
+          mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(),
+              Bytes.SIZEOF_INT));
+        }
+        kvs.add(kv.createKeyOnly(false));
+      }
+      // If the mob file is a invalid one or a small one, merge it into new/bigger ones.
+      if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) {
+        context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1);
+        MobFile file = MobFile.create(fs,
+            new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig);
+        StoreFileScanner scanner = null;
+        try {
+          scanner = file.getScanner();
+          scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
+          Cell cell;
+          while (null != (cell = scanner.next())) {
+            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+            KeyValue keyOnly = kv.createKeyOnly(false);
+            if (kvs.contains(keyOnly)) {
+              // write the KeyValue existing in HBase to the memstore.
+              memstore.addToMemstore(kv);
+              memstoreUpdated = true;
+            }
+          }
+        } finally {
+          if (scanner != null) {
+            scanner.close();
+          }
+        }
+        toBeDeleted.add(mobFileStat.getFileStatus().getPath());
+      }
+    }
+
+    /**
+     * Lists the files with the same prefix.
+     * @param p The file path.
+     * @param prefix The prefix.
+     * @return The files with the same prefix.
+     * @throws IOException
+     */
+    private FileStatus[] listStatus(Path p, String prefix) throws IOException {
+      return fs.listStatus(p, new PathPrefixFilter(prefix));
+    }
+  }
+
+  static class PathPrefixFilter implements PathFilter {
+
+    private final String prefix;
+
+    public PathPrefixFilter(String prefix) {
+      this.prefix = prefix;
+    }
+
+    public boolean accept(Path path) {
+      return path.getName().startsWith(prefix, 0);
+    }
+
+  }
+
+  /**
+   * The sweep partition id.
+   * It consists of the start key and date.
+   * The start key is a hex string of the checksum of a region start key.
+   * The date is the latest timestamp of cells in a mob file.
+   */
+  public static class SweepPartitionId {
+    private String date;
+    private String startKey;
+
+    public SweepPartitionId(MobFileName fileName) {
+      this.date = fileName.getDate();
+      this.startKey = fileName.getStartKey();
+    }
+
+    public SweepPartitionId(String date, String startKey) {
+      this.date = date;
+      this.startKey = startKey;
+    }
+
+    public static SweepPartitionId create(String key) {
+      return new SweepPartitionId(MobFileName.create(key));
+    }
+
+    @Override
+    public boolean equals(Object anObject) {
+      if (this == anObject) {
+        return true;
+      }
+      if (anObject instanceof SweepPartitionId) {
+        SweepPartitionId another = (SweepPartitionId) anObject;
+        if (this.date.equals(another.getDate()) && this.startKey.equals(another.getStartKey())) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public String getDate() {
+      return this.date;
+    }
+
+    public String getStartKey() {
+      return this.startKey;
+    }
+
+    public void setDate(String date) {
+      this.date = date;
+    }
+
+    public void setStartKey(String startKey) {
+      this.startKey = startKey;
+    }
+  }
+
+  /**
+   * The mob file status used in the sweep reduecer.
+   */
+  private static class MobFileStatus {
+    private FileStatus fileStatus;
+    private int validSize;
+    private long size;
+
+    private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO;
+    private long compactionMergeableSize =
+        MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE;
+
+    /**
+     * @param fileStatus The current FileStatus.
+     * @param compactionRatio compactionRatio the invalid ratio.
+     * If there're too many cells deleted in a mob file, it's regarded as invalid,
+     * and needs to be written to a new one.
+     * If existingCellSize/fileSize < compactionRatio, it's regarded as a invalid one.
+     * @param compactionMergeableSize compactionMergeableSize If the size of a mob file is less
+     * than this value, it's regarded as a small file and needs to be merged
+     */
+    public MobFileStatus(FileStatus fileStatus, float compactionRatio,
+        long compactionMergeableSize) {
+      this.fileStatus = fileStatus;
+      this.size = fileStatus.getLen();
+      validSize = 0;
+      this.compactionRatio = compactionRatio;
+      this.compactionMergeableSize = compactionMergeableSize;
+    }
+
+    /**
+     * Add size to this file.
+     * @param size The size to be added.
+     */
+    public void addValidSize(int size) {
+      this.validSize += size;
+    }
+
+    /**
+     * Whether the mob files need to be cleaned.
+     * If there're too many cells deleted in this mob file, it needs to be cleaned.
+     * @return True if it needs to be cleaned.
+     */
+    public boolean needClean() {
+      return validSize < compactionRatio * size;
+    }
+
+    /**
+     * Whether the mob files need to be merged.
+     * If this mob file is too small, it needs to be merged.
+     * @return True if it needs to be merged.
+     */
+    public boolean needMerge() {
+      return this.size < compactionMergeableSize;
+    }
+
+    /**
+     * Gets the file status.
+     * @return The file status.
+     */
+    public FileStatus getFileStatus() {
+      return fileStatus;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
new file mode 100644
index 0000000..d71dc83
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * The sweep tool. It deletes the mob files that are not used and merges the small mob files to
+ * bigger ones. Each run of this sweep tool only handles one column family. The runs on
+ * the same column family are mutually exclusive. And the major compaction and sweep tool on the
+ * same column family are mutually exclusive too.
+ */
+@InterfaceAudience.Public
+public class Sweeper extends Configured implements Tool {
+
+  /**
+   * Sweeps the mob files on one column family. It deletes the unused mob files and merges
+   * the small mob files into bigger ones.
+   * @param tableName The current table name in string format.
+   * @param familyName The column family name.
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   * @throws KeeperException
+   * @throws ServiceException
+   */
+  void sweepFamily(String tableName, String familyName) throws IOException, InterruptedException,
+      ClassNotFoundException, KeeperException, ServiceException {
+    Configuration conf = getConf();
+    // make sure the target HBase exists.
+    HBaseAdmin.checkHBaseAvailable(conf);
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      TableName tn = TableName.valueOf(tableName);
+      HTableDescriptor htd = admin.getTableDescriptor(tn);
+      HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName));
+      if (family == null || !MobUtils.isMobFamily(family)) {
+        throw new IOException("Column family " + familyName + " is not a MOB column family");
+      }
+      SweepJob job = new SweepJob(conf, fs);
+      // Run the sweeping
+      job.sweep(tn, family);
+    } finally {
+      try {
+        admin.close();
+      } catch (IOException e) {
+        System.out.println("Fail to close the HBaseAdmin: " + e.getMessage());
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    ToolRunner.run(conf, new Sweeper(), args);
+  }
+
+  private void printUsage() {
+    System.err.println("Usage:\n" + "--------------------------\n" + Sweeper.class.getName()
+        + " tableName familyName");
+    System.err.println(" tableName        The table name");
+    System.err.println(" familyName       The column family name");
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      printUsage();
+      return 1;
+    }
+    String table = args[0];
+    String family = args[1];
+    sweepFamily(table, family);
+    return 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 9c6f34e..071b5fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -18,13 +18,17 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.NavigableSet;
 import java.util.UUID;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -32,7 +36,10 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -44,7 +51,10 @@ import org.apache.hadoop.hbase.mob.MobFile;
 import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobStoreEngine;
 import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * The store implementation to save MOBs (medium objects), it extends the HStore.
@@ -68,6 +78,7 @@ public class HMobStore extends HStore {
   private MobCacheConfig mobCacheConfig;
   private Path homePath;
   private Path mobFamilyPath;
+  private List<Path> mobDirLocations;
 
   public HMobStore(final HRegion region, final HColumnDescriptor family,
       final Configuration confParam) throws IOException {
@@ -76,6 +87,11 @@ public class HMobStore extends HStore {
     this.homePath = MobUtils.getMobHome(conf);
     this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
         family.getNameAsString());
+    mobDirLocations = new ArrayList<Path>();
+    mobDirLocations.add(mobFamilyPath);
+    TableName tn = region.getTableDesc().getTableName();
+    mobDirLocations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
+        .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
   }
 
   /**
@@ -87,6 +103,13 @@ public class HMobStore extends HStore {
   }
 
   /**
+   * Gets current config.
+   */
+  public Configuration getConfiguration() {
+    return this.conf;
+  }
+
+  /**
    * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
    * the mob files should be performed after the seek in HBase is done.
    */
@@ -94,6 +117,15 @@ public class HMobStore extends HStore {
   protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
       long readPt, KeyValueScanner scanner) throws IOException {
     if (scanner == null) {
+      if (MobUtils.isRefOnlyScan(scan)) {
+        Filter refOnlyFilter = new MobReferenceOnlyFilter();
+        Filter filter = scan.getFilter();
+        if (filter != null) {
+          scan.setFilter(new FilterList(filter, refOnlyFilter));
+        } else {
+          scan.setFilter(refOnlyFilter);
+        }
+      }
       scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
           targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
     }
@@ -219,30 +251,10 @@ public class HMobStore extends HStore {
    */
   public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
     Cell result = null;
-    if (MobUtils.isValidMobRefCellValue(reference)) {
+    if (MobUtils.hasValidMobRefCellValue(reference)) {
       String fileName = MobUtils.getMobFileName(reference);
-      Path targetPath = new Path(mobFamilyPath, fileName);
-      MobFile file = null;
-      try {
-        file = mobCacheConfig.getMobFileCache().openFile(region.getFilesystem(), targetPath,
-            mobCacheConfig);
-        result = file.readCell(reference, cacheBlocks);
-      } catch (IOException e) {
-        LOG.error("Fail to open/read the mob file " + targetPath.toString(), e);
-      } catch (NullPointerException e) {
-        // When delete the file during the scan, the hdfs getBlockRange will
-        // throw NullPointerException, catch it and manage it.
-        LOG.error("Fail to read the mob file " + targetPath.toString()
-            + " since it's already deleted", e);
-      } finally {
-        if (file != null) {
-          mobCacheConfig.getMobFileCache().closeFile(file);
-        }
-      }
-    } else {
-      LOG.warn("Invalid reference to mob, " + reference.getValueLength() + " bytes is too short");
+      result = readCell(fileName, reference, cacheBlocks);
     }
-
     if (result == null) {
       LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
           + "qualifier,timestamp,type and tags but with an empty value to return.");
@@ -258,10 +270,132 @@ public class HMobStore extends HStore {
   }
 
   /**
+   * Reads the cell from a mob file.
+   * The mob file might be located in different directories.
+   * 1. The working directory.
+   * 2. The archive directory.
+   * Reads the cell from the files located in both of the above directories.
+   * @param fileName The file to be read.
+   * @param search The cell to be searched.
+   * @param cacheMobBlocks Whether the scanner should cache blocks.
+   * @return The found cell. Null if there's no such a cell.
+   * @throws IOException
+   */
+  private Cell readCell(String fileName, Cell search, boolean cacheMobBlocks) throws IOException {
+    FileSystem fs = getFileSystem();
+    for (Path location : mobDirLocations) {
+      MobFile file = null;
+      Path path = new Path(location, fileName);
+      try {
+        file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig);
+        return file.readCell(search, cacheMobBlocks);
+      } catch (IOException e) {
+        mobCacheConfig.getMobFileCache().evictFile(fileName);
+        if (e instanceof FileNotFoundException) {
+          LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e);
+        } else {
+          throw e;
+        }
+      } finally {
+        if (file != null) {
+          mobCacheConfig.getMobFileCache().closeFile(file);
+        }
+      }
+    }
+    LOG.error("The mob file " + fileName + " could not be found in the locations "
+        + mobDirLocations);
+    return null;
+  }
+
+  /**
    * Gets the mob file path.
    * @return The mob file path.
    */
   public Path getPath() {
     return mobFamilyPath;
   }
+
+  /**
+   * The compaction in the store of mob.
+   * The cells in this store contains the path of the mob files. There might be race
+   * condition between the major compaction and the sweeping in mob files.
+   * In order to avoid this, we need mutually exclude the running of the major compaction and
+   * sweeping in mob files.
+   * The minor compaction is not affected.
+   * The major compaction is converted to a minor one when a sweeping is in progress.
+   */
+  @Override
+  public List<StoreFile> compact(CompactionContext compaction) throws IOException {
+    // If it's major compaction, try to find whether there's a sweeper is running
+    // If yes, change the major compaction to a minor one.
+    if (compaction.getRequest().isMajor()) {
+      // Use the Zookeeper to coordinate.
+      // 1. Acquire a operation lock.
+      //   1.1. If no, convert the major compaction to a minor one and continue the compaction.
+      //   1.2. If the lock is obtained, search the node of sweeping.
+      //      1.2.1. If the node is there, the sweeping is in progress, convert the major
+      //             compaction to a minor one and continue the compaction.
+      //      1.2.2. If the node is not there, add a child to the major compaction node, and
+      //             run the compaction directly.
+      String compactionName = UUID.randomUUID().toString().replaceAll("-", "");
+      MobZookeeper zk = null;
+      try {
+        zk = MobZookeeper.newInstance(this.conf, compactionName);
+      } catch (KeeperException e) {
+        LOG.error("Cannot connect to the zookeeper, ready to perform the minor compaction instead",
+            e);
+        // change the major compaction into a minor one
+        compaction.getRequest().setIsMajor(false, false);
+        return super.compact(compaction);
+      }
+      boolean major = false;
+      try {
+        // try to acquire the operation lock.
+        if (zk.lockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString())) {
+          try {
+            LOG.info("Obtain the lock for the store[" + this
+                + "], ready to perform the major compaction");
+            // check the sweeping node to find out whether the sweeping is in progress.
+            boolean hasSweeper = zk.isSweeperZNodeExist(getTableName().getNameAsString(),
+                getFamily().getNameAsString());
+            if (!hasSweeper) {
+              // if not, add a child to the major compaction node of this store.
+              major = zk.addMajorCompactionZNode(getTableName().getNameAsString(), getFamily()
+                  .getNameAsString(), compactionName);
+            }
+          } catch (Exception e) {
+            LOG.error("Fail to handle the Zookeeper", e);
+          } finally {
+            // release the operation lock
+            zk.unlockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString());
+          }
+        }
+        try {
+          if (major) {
+            return super.compact(compaction);
+          } else {
+            LOG.warn("Cannot obtain the lock or a sweep tool is running on this store["
+                + this + "], ready to perform the minor compaction instead");
+            // change the major compaction into a minor one
+            compaction.getRequest().setIsMajor(false, false);
+            return super.compact(compaction);
+          }
+        } finally {
+          if (major) {
+            try {
+              zk.deleteMajorCompactionZNode(getTableName().getNameAsString(), getFamily()
+                  .getNameAsString(), compactionName);
+            } catch (KeeperException e) {
+              LOG.error("Fail to delete the compaction znode" + compactionName, e);
+            }
+          }
+        }
+      } finally {
+        zk.close();
+      }
+    } else {
+      // If it's not a major compaction, continue the compaction.
+      return super.compact(compaction);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
new file mode 100644
index 0000000..10aea24
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * A filter that returns the cells which have mob reference tags. It's a server-side filter.
+ */
+@InterfaceAudience.Private
+class MobReferenceOnlyFilter extends FilterBase {
+
+  @Override
+  public ReturnCode filterKeyValue(Cell cell) {
+    if (null != cell) {
+      // If a cell with a mob reference tag, it's included.
+      if (MobUtils.isMobReferenceCell(cell)) {
+        return ReturnCode.INCLUDE;
+      }
+    }
+    return ReturnCode.SKIP;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
new file mode 100644
index 0000000..7cba86c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestExpiredMobFileCleaner {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static TableName tableName = TableName.valueOf("TestExpiredMobFileCleaner");
+  private final static String family = "family";
+  private final static byte[] row1 = Bytes.toBytes("row1");
+  private final static byte[] row2 = Bytes.toBytes("row2");
+  private final static byte[] qf = Bytes.toBytes("qf");
+
+  private static HTable table;
+  private static Admin admin;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+    admin.close();
+    TEST_UTIL.shutdownMiniCluster();
+    TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true);
+  }
+
+  private void init() throws Exception {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+    hcd.setMaxVersions(4);
+    desc.addFamily(hcd);
+
+    admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(desc);
+    table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    table.setAutoFlush(false, false);
+  }
+
+  private void modifyColumnExpiryDays(int expireDays) throws Exception {
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+    // change ttl as expire days to make some row expired
+    int timeToLive = expireDays * secondsOfDay();
+    hcd.setTimeToLive(timeToLive);
+
+    admin.modifyColumn(tableName, hcd);
+  }
+
+  private void putKVAndFlush(HTable table, byte[] row, byte[] value, long ts)
+      throws Exception {
+
+    Put put = new Put(row, ts);
+    put.add(Bytes.toBytes(family), qf, value);
+    table.put(put);
+
+    table.flushCommits();
+    admin.flush(tableName);
+  }
+
+  /**
+   * Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2 days.
+   * Verifies that the 3 day old hfile is removed but the 1 day one is still present
+   * after the expiry based cleaner is run.
+   */
+  @Test
+  public void testCleaner() throws Exception {
+    init();
+
+    Path mobDirPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+
+    byte[] dummyData = makeDummyData(600);
+    long ts = System.currentTimeMillis() - 3 * secondsOfDay() * 1000; // 3 days before
+    putKVAndFlush(table, row1, dummyData, ts);
+    FileStatus[] firstFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+    //the first mob file
+    assertEquals("Before cleanup without delay 1", 1, firstFiles.length);
+    String firstFile = firstFiles[0].getPath().getName();
+
+    ts = System.currentTimeMillis() - 1 * secondsOfDay() * 1000; // 1 day before
+    putKVAndFlush(table, row2, dummyData, ts);
+    FileStatus[] secondFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+    //now there are 2 mob files
+    assertEquals("Before cleanup without delay 2", 2, secondFiles.length);
+    String f1 = secondFiles[0].getPath().getName();
+    String f2 = secondFiles[1].getPath().getName();
+    String secondFile = f1.equals(firstFile) ? f2 : f1;
+
+    modifyColumnExpiryDays(2); // ttl = 2, make the first row expired
+
+    //run the cleaner
+    String[] args = new String[2];
+    args[0] = tableName.getNameAsString();
+    args[1] = family;
+    ToolRunner.run(TEST_UTIL.getConfiguration(), new ExpiredMobFileCleaner(), args);
+
+    FileStatus[] filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+    String lastFile = filesAfterClean[0].getPath().getName();
+    //the first mob fie is removed
+    assertEquals("After cleanup without delay 1", 1, filesAfterClean.length);
+    assertEquals("After cleanup without delay 2", secondFile, lastFile);
+  }
+
+  private Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
+    Path p = new Path(MobUtils.getMobRegionPath(conf, tableName), familyName);
+    return p;
+  }
+
+  private int secondsOfDay() {
+    return 24 * 3600;
+  }
+
+  private byte[] makeDummyData(int size) {
+    byte [] dummyData = new byte[size];
+    new Random().nextBytes(dummyData);
+    return dummyData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
new file mode 100644
index 0000000..e0b9a83
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobSweepJob {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+    TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+        JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void writeFileNames(FileSystem fs, Configuration conf, Path path,
+      String[] filesNames) throws IOException {
+    // write the names to a sequence file
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
+        String.class, String.class);
+    try {
+      for (String fileName : filesNames) {
+        writer.append(fileName, MobConstants.EMPTY_STRING);
+      }
+    } finally {
+      IOUtils.closeStream(writer);
+    }
+  }
+
+  @Test
+  public void testSweeperJobWithOutUnusedFile() throws Exception {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Configuration configuration = new Configuration(
+        TEST_UTIL.getConfiguration());
+    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/0/visited");
+    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/0/all");
+    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+        vistiedFileNamesPath.toString());
+    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+        allFileNamesPath.toString());
+
+    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+        "2", "3", "4", "5", "6"});
+
+    Path r0 = new Path(vistiedFileNamesPath, "r0");
+    writeFileNames(fs, configuration, r0, new String[] { "1",
+        "2", "3"});
+    Path r1 = new Path(vistiedFileNamesPath, "r1");
+    writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"});
+    Path r2 = new Path(vistiedFileNamesPath, "r2");
+    writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"});
+
+    SweepJob sweepJob = new SweepJob(configuration, fs);
+    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+    assertEquals(0, toBeArchived.size());
+  }
+
+  @Test
+  public void testSweeperJobWithUnusedFile() throws Exception {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Configuration configuration = new Configuration(
+        TEST_UTIL.getConfiguration());
+    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/1/visited");
+    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/1/all");
+    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+        vistiedFileNamesPath.toString());
+    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+        allFileNamesPath.toString());
+
+    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+        "2", "3", "4", "5", "6"});
+
+    Path r0 = new Path(vistiedFileNamesPath, "r0");
+    writeFileNames(fs, configuration, r0, new String[] { "1",
+        "2", "3"});
+    Path r1 = new Path(vistiedFileNamesPath, "r1");
+    writeFileNames(fs, configuration, r1, new String[] { "1", "5"});
+    Path r2 = new Path(vistiedFileNamesPath, "r2");
+    writeFileNames(fs, configuration, r2, new String[] { "2", "3"});
+
+    SweepJob sweepJob = new SweepJob(configuration, fs);
+    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+    assertEquals(2, toBeArchived.size());
+    assertEquals(new String[] { "4", "6" }, toBeArchived.toArray(new String[0]));
+  }
+
+  @Test
+  public void testSweeperJobWithRedundantFile() throws Exception {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Configuration configuration = new Configuration(
+        TEST_UTIL.getConfiguration());
+    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/2/visited");
+    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/2/all");
+    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+        vistiedFileNamesPath.toString());
+    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+        allFileNamesPath.toString());
+
+    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+        "2", "3", "4", "5", "6"});
+
+    Path r0 = new Path(vistiedFileNamesPath, "r0");
+    writeFileNames(fs, configuration, r0, new String[] { "1",
+        "2", "3"});
+    Path r1 = new Path(vistiedFileNamesPath, "r1");
+    writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"});
+    Path r2 = new Path(vistiedFileNamesPath, "r2");
+    writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"});
+
+    SweepJob sweepJob = new SweepJob(configuration, fs);
+    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+    assertEquals(0, toBeArchived.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
new file mode 100644
index 0000000..a7e2538
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+@Category(SmallTests.class)
+public class TestMobSweepMapper {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void TestMap() throws Exception {
+    String prefix = "0000";
+    final String fileName = "19691231f2cd014ea28f42788214560a21a44cef";
+    final String mobFilePath = prefix + fileName;
+
+    ImmutableBytesWritable r = new ImmutableBytesWritable(Bytes.toBytes("r"));
+    final KeyValue[] kvList = new KeyValue[1];
+    kvList[0] = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
+            Bytes.toBytes("column"), Bytes.toBytes(mobFilePath));
+
+    Result columns = mock(Result.class);
+    when(columns.raw()).thenReturn(kvList);
+
+    Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
+    configuration.set(SweepJob.SWEEP_JOB_ID, "1");
+    configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepMapper:family-sweeper");
+
+    MobZookeeper zk = MobZookeeper.newInstance(configuration, "1");
+    zk.addSweeperZNode("testSweepMapper", "family", Bytes.toBytes("1"));
+
+    Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
+            mock(Mapper.Context.class);
+    when(ctx.getConfiguration()).thenReturn(configuration);
+    SweepMapper map = new SweepMapper();
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        Text text = (Text) invocation.getArguments()[0];
+        KeyValue kv = (KeyValue) invocation.getArguments()[1];
+
+        assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName);
+        assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
+
+        return null;
+      }
+    }).when(ctx).write(any(Text.class), any(KeyValue.class));
+
+    map.map(r, columns, ctx);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
new file mode 100644
index 0000000..0f4c3ff
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Matchers;
+
+@Category(MediumTests.class)
+public class TestMobSweepReducer {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static String tableName = "testSweepReducer";
+  private final static String row = "row";
+  private final static String family = "family";
+  private final static String qf = "qf";
+  private static HTable table;
+  private static Admin admin;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Before
+  public void setUp() throws Exception {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+    hcd.setMaxVersions(4);
+    desc.addFamily(hcd);
+
+    admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(desc);
+    table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(TableName.valueOf(tableName));
+    admin.deleteTable(TableName.valueOf(tableName));
+    admin.close();
+  }
+
+  private List<String> getKeyFromSequenceFile(FileSystem fs, Path path,
+                                              Configuration conf) throws Exception {
+    List<String> list = new ArrayList<String>();
+    SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
+
+    String next = (String) reader.next((String) null);
+    while (next != null) {
+      list.add(next);
+      next = (String) reader.next((String) null);
+    }
+    reader.close();
+    return list;
+  }
+
+  @Test
+  public void testRun() throws Exception {
+
+    byte[] mobValueBytes = new byte[100];
+
+    //get the path where mob files lie in
+    Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(),
+    TableName.valueOf(tableName), family);
+
+    Put put = new Put(Bytes.toBytes(row));
+    put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
+    Put put2 = new Put(Bytes.toBytes(row + "ignore"));
+    put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
+    table.put(put);
+    table.put(put2);
+    table.flushCommits();
+    admin.flush(TableName.valueOf(tableName));
+
+    FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    //check the generation of a mob file
+    assertEquals(1, fileStatuses.length);
+
+    String mobFile1 = fileStatuses[0].getPath().getName();
+
+    Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
+    configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f);
+    configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName);
+    configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family);
+    configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir");
+    configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir");
+    configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+            JavaSerialization.class.getName());
+    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir");
+    configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
+        System.currentTimeMillis() + 24 * 3600 * 1000);
+
+    configuration.set(SweepJob.SWEEP_JOB_ID, "1");
+    configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepReducer:family-sweeper");
+
+    MobZookeeper zk = MobZookeeper.newInstance(configuration, "1");
+    zk.addSweeperZNode(tableName, family, Bytes.toBytes("1"));
+
+    //use the same counter when mocking
+    Counter counter = new GenericCounter();
+    Reducer<Text, KeyValue, Writable, Writable>.Context ctx =
+            mock(Reducer.Context.class);
+    when(ctx.getConfiguration()).thenReturn(configuration);
+    when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
+    when(ctx.nextKey()).thenReturn(true).thenReturn(false);
+    when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
+
+    byte[] refBytes = Bytes.toBytes(mobFile1);
+    long valueLength = refBytes.length;
+    byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
+    KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family),
+            Bytes.toBytes(qf), 1, KeyValue.Type.Put, newValue);
+    List<KeyValue> list = new ArrayList<KeyValue>();
+    list.add(kv2);
+
+    when(ctx.getValues()).thenReturn(list);
+
+    SweepReducer reducer = new SweepReducer();
+    reducer.run(ctx);
+
+    FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    String mobFile2 = filsStatuses2[0].getPath().getName();
+    //new mob file is generated, old one has been archived
+    assertEquals(1, filsStatuses2.length);
+    assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
+
+    //test sequence file
+    String workingPath = configuration.get("mob.compaction.visited.dir");
+    FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
+    Set<String> files = new TreeSet<String>();
+    for (FileStatus st : statuses) {
+      files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(),
+              st.getPath(), configuration));
+    }
+    assertEquals(1, files.size());
+    assertEquals(true, files.contains(mobFile1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
new file mode 100644
index 0000000..c43bceb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobSweeper {
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private String tableName;
+  private final static String row = "row_";
+  private final static String family = "family";
+  private final static String column = "column";
+  private static HTable table;
+  private static Admin admin;
+
+  private Random random = new Random();
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+
+    TEST_UTIL.startMiniCluster();
+
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Before
+  public void setUp() throws Exception {
+    long tid = System.currentTimeMillis();
+    tableName = "testSweeper" + tid;
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+    hcd.setMaxVersions(4);
+    desc.addFamily(hcd);
+
+    admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(desc);
+    table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    table.setAutoFlush(false);
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(TableName.valueOf(tableName));
+    admin.deleteTable(TableName.valueOf(tableName));
+    admin.close();
+  }
+
+  private Path getMobFamilyPath(Configuration conf, String tableNameStr,
+                                String familyName) {
+    Path p = new Path(MobUtils.getMobRegionPath(conf, TableName.valueOf(tableNameStr)),
+            familyName);
+    return p;
+  }
+
+
+  private String mergeString(Set<String> set) {
+    StringBuilder sb = new StringBuilder();
+    for (String s : set)
+      sb.append(s);
+    return sb.toString();
+  }
+
+
+  private void generateMobTable(int count, int flushStep)
+          throws IOException, InterruptedException {
+    if (count <= 0 || flushStep <= 0)
+      return;
+    int index = 0;
+    for (int i = 0; i < count; i++) {
+      byte[] mobVal = new byte[101*1024];
+      random.nextBytes(mobVal);
+
+      Put put = new Put(Bytes.toBytes(row + i));
+      put.add(Bytes.toBytes(family), Bytes.toBytes(column), mobVal);
+      table.put(put);
+      if (index++ % flushStep == 0) {
+        table.flushCommits();
+        admin.flush(TableName.valueOf(tableName));
+      }
+
+
+    }
+    table.flushCommits();
+    admin.flush(TableName.valueOf(tableName));
+  }
+
+  @Test
+  public void testSweeper() throws Exception {
+
+    int count = 10;
+    //create table and generate 10 mob files
+    generateMobTable(count, 1);
+
+    //get mob files
+    Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+    FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    // mobFileSet0 stores the orignal mob files
+    TreeSet<String> mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    //scan the table, retreive the references
+    Scan scan = new Scan();
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+    ResultScanner rs = table.getScanner(scan);
+    TreeSet<String> mobFilesScanned = new TreeSet<String>();
+    for (Result res : rs) {
+      byte[] valueBytes = res.getValue(Bytes.toBytes(family),
+          Bytes.toBytes(column));
+      mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+          valueBytes.length - Bytes.SIZEOF_INT));
+    }
+
+    //there should be 10 mob files
+    assertEquals(10, mobFilesScanned.size());
+    //check if we store the correct reference of mob files
+    assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
+
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 24 * 60 * 60 * 1000);
+
+    String[] args = new String[2];
+    args[0] = tableName;
+    args[1] = family;
+    ToolRunner.run(conf, new Sweeper(), args);
+
+
+    mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    assertEquals(10, mobFilesSet.size());
+
+
+    scan = new Scan();
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+    rs = table.getScanner(scan);
+    TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
+    for (Result res : rs) {
+      byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
+          column));
+      mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+          valueBytes.length - Bytes.SIZEOF_INT));
+    }
+
+    assertEquals(10, mobFilesScannedAfterJob.size());
+
+    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    assertEquals(10, mobFilesSet.size());
+    assertEquals(true, mobFilesScannedAfterJob.iterator().next()
+            .equalsIgnoreCase(mobFilesSet.iterator().next()));
+
+  }
+
+  @Test
+  public void testCompactionDelaySweeper() throws Exception {
+
+    int count = 10;
+    //create table and generate 10 mob files
+    generateMobTable(count, 1);
+
+    //get mob files
+    Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+    FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    // mobFileSet0 stores the orignal mob files
+    TreeSet<String> mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    //scan the table, retreive the references
+    Scan scan = new Scan();
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+    ResultScanner rs = table.getScanner(scan);
+    TreeSet<String> mobFilesScanned = new TreeSet<String>();
+    for (Result res : rs) {
+      byte[] valueBytes = res.getValue(Bytes.toBytes(family),
+              Bytes.toBytes(column));
+      mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+          valueBytes.length - Bytes.SIZEOF_INT));
+    }
+
+    //there should be 10 mob files
+    assertEquals(10, mobFilesScanned.size());
+    //check if we store the correct reference of mob files
+    assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
+
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 0);
+
+    String[] args = new String[2];
+    args[0] = tableName;
+    args[1] = family;
+    ToolRunner.run(conf, new Sweeper(), args);
+
+
+    mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    assertEquals(1, mobFilesSet.size());
+
+
+    scan = new Scan();
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+    rs = table.getScanner(scan);
+    TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
+    for (Result res : rs) {
+      byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
+              column));
+      mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+          valueBytes.length - Bytes.SIZEOF_INT));
+    }
+
+    assertEquals(1, mobFilesScannedAfterJob.size());
+
+    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    assertEquals(1, mobFilesSet.size());
+    assertEquals(true, mobFilesScannedAfterJob.iterator().next()
+            .equalsIgnoreCase(mobFilesSet.iterator().next()));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
index f8d6ce4..51f4de3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
@@ -53,10 +53,13 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -70,7 +73,7 @@ public class TestMobCompaction {
   @Rule
   public TestName name = new TestName();
   static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName());
-  private HBaseTestingUtility UTIL = null;
+  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
   private Configuration conf = null;
 
   private HRegion region = null;
@@ -84,14 +87,22 @@ public class TestMobCompaction {
   private final byte[] STARTROW = Bytes.toBytes(START_KEY);
   private int compactionThreshold;
 
-  private void init(long mobThreshold) throws Exception {
-    this.mobCellThreshold = mobThreshold;
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+    UTIL.startMiniCluster(1);
+  }
 
-    UTIL = HBaseTestingUtility.createLocalHTU();
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
 
+  private void init(long mobThreshold) throws Exception {
+    this.mobCellThreshold = mobThreshold;
     conf = UTIL.getConfiguration();
     compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
-
     htd = UTIL.createTableDescriptor(name.getMethodName());
     hcd = new HColumnDescriptor(COLUMN_FAMILY);
     hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
@@ -158,7 +169,8 @@ public class TestMobCompaction {
     region.getTableDesc().getFamily(COLUMN_FAMILY).setValue(
         MobConstants.MOB_THRESHOLD, Bytes.toBytes(500L));
     region.initialize();
-    region.compactStores(true);
+    region.compactStores();
+
     assertEquals("After compaction: store files", 1, countStoreFiles());
     assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
     assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
@@ -307,7 +319,7 @@ public class TestMobCompaction {
         if (!MobUtils.isMobReferenceCell(kv)) {
           continue;
         }
-        if (!MobUtils.isValidMobRefCellValue(kv)) {
+        if (!MobUtils.hasValidMobRefCellValue(kv)) {
           continue;
         }
         int size = MobUtils.getMobValueLength(kv);

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
index 69b9c8f..87147d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
@@ -22,12 +22,14 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.TableName;
@@ -40,6 +42,8 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -120,6 +124,7 @@ public class TestMobStoreScanner {
 	  testGetFromMemStore(false);
     testGetReferences(false);
     testMobThreshold(false);
+    testGetFromArchive(false);
   }
 
   @Test
@@ -128,6 +133,7 @@ public class TestMobStoreScanner {
 	  testGetFromMemStore(true);
     testGetReferences(true);
     testMobThreshold(true);
+    testGetFromArchive(true);
   }
 
   public void testGetFromFiles(boolean reversed) throws Exception {
@@ -282,6 +288,72 @@ public class TestMobStoreScanner {
     results.close();
   }
 
+  public void testGetFromArchive(boolean reversed) throws Exception {
+    String TN = "testGetFromArchive" + reversed;
+    setUp(defaultThreshold, TN);
+    long ts1 = System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    byte [] value = generateMobValue((int)defaultThreshold+1);;
+    // Put some data
+    Put put1 = new Put(row1);
+    put1.add(family, qf1, ts3, value);
+    put1.add(family, qf2, ts2, value);
+    put1.add(family, qf3, ts1, value);
+    table.put(put1);
+
+    table.flushCommits();
+    admin.flush(TN);
+
+    // Get the files in the mob path
+    Path mobFamilyPath;
+    mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+        TableName.valueOf(TN)), hcd.getNameAsString());
+    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+    FileStatus[] files = fs.listStatus(mobFamilyPath);
+
+    // Get the archive path
+    Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
+    Path tableDir = FSUtils.getTableDir(rootDir, TableName.valueOf(TN));
+    HRegionInfo regionInfo = MobUtils.getMobRegionInfo(TableName.valueOf(TN));
+    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(),
+        regionInfo, tableDir, family);
+
+    // Move the files from mob path to archive path
+    fs.mkdirs(storeArchiveDir);
+    int fileCount = 0;
+    for(FileStatus file : files) {
+      fileCount++;
+      Path filePath = file.getPath();
+      Path src = new Path(mobFamilyPath, filePath.getName());
+      Path dst = new Path(storeArchiveDir, filePath.getName());
+      fs.rename(src, dst);
+    }
+
+    // Verify the moving success
+    FileStatus[] files1 = fs.listStatus(mobFamilyPath);
+    Assert.assertEquals(0, files1.length);
+    FileStatus[] files2 = fs.listStatus(storeArchiveDir);
+    Assert.assertEquals(fileCount, files2.length);
+
+    // Scan from archive
+    Scan scan = new Scan();
+    setScan(scan, reversed, false);
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      List<Cell> cells = res.listCells();
+      for(Cell cell : cells) {
+        // Verify the value
+        Assert.assertEquals(Bytes.toString(value),
+            Bytes.toString(CellUtil.cloneValue(cell)));
+        count++;
+      }
+    }
+    results.close();
+    Assert.assertEquals(3, count);
+  }
+
   /**
    * Assert the value is not store in mob.
    */


[2/2] git commit: HBASE-11644 External MOB compaction tools (Jingcheng Du)

Posted by jm...@apache.org.
HBASE-11644 External MOB compaction tools (Jingcheng Du)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/84e957c8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/84e957c8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/84e957c8

Branch: refs/heads/hbase-11339
Commit: 84e957c875ae971578a5b147775445368ea26188
Parents: 9cf46dc
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Fri Sep 19 03:02:06 2014 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Sep 19 03:06:33 2014 -0700

----------------------------------------------------------------------
 .../src/main/resources/hbase-default.xml        |  34 ++
 .../master/ExpiredMobFileCleanerChore.java      |  70 +++
 .../org/apache/hadoop/hbase/master/HMaster.java |   7 +
 .../hadoop/hbase/mob/DefaultMobCompactor.java   |   2 +-
 .../hadoop/hbase/mob/ExpiredMobFileCleaner.java | 120 ++++
 .../apache/hadoop/hbase/mob/MobConstants.java   |  22 +
 .../org/apache/hadoop/hbase/mob/MobUtils.java   | 285 +++++++++-
 .../apache/hadoop/hbase/mob/MobZookeeper.java   | 270 +++++++++
 .../hbase/mob/mapreduce/MemStoreWrapper.java    | 184 +++++++
 .../mapreduce/MobFilePathHashPartitioner.java   |  41 ++
 .../hadoop/hbase/mob/mapreduce/SweepJob.java    | 550 +++++++++++++++++++
 .../mob/mapreduce/SweepJobNodeTracker.java      |  74 +++
 .../hadoop/hbase/mob/mapreduce/SweepMapper.java |  84 +++
 .../hbase/mob/mapreduce/SweepReducer.java       | 506 +++++++++++++++++
 .../hadoop/hbase/mob/mapreduce/Sweeper.java     | 108 ++++
 .../hadoop/hbase/regionserver/HMobStore.java    | 180 +++++-
 .../regionserver/MobReferenceOnlyFilter.java    |  42 ++
 .../hbase/mob/TestExpiredMobFileCleaner.java    | 180 ++++++
 .../hbase/mob/mapreduce/TestMobSweepJob.java    | 168 ++++++
 .../hbase/mob/mapreduce/TestMobSweepMapper.java | 100 ++++
 .../mob/mapreduce/TestMobSweepReducer.java      | 207 +++++++
 .../hbase/mob/mapreduce/TestMobSweeper.java     | 306 +++++++++++
 .../hbase/regionserver/TestMobCompaction.java   |  26 +-
 .../hbase/regionserver/TestMobStoreScanner.java |  72 +++
 24 files changed, 3594 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 872bbc5..647defd 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1483,4 +1483,38 @@ possible configurations would overwhelm and obscure the important.
       The default value is 0.5f.
     </description>
   </property>
+  <property>
+    <name>hbase.mob.sweep.tool.compaction.ratio</name>
+    <value>0.5f</value>
+    <description>
+      If there're too many cells deleted in a mob file, it's regarded
+      as an invalid file and needs to be merged.
+      If existingCellsSize/mobFileSize is less than ratio, it's regarded
+      as an invalid file. The default value is 0.5f.
+    </description>
+  </property>
+  <property>
+    <name>hbase.mob.sweep.tool.compaction.mergeable.size</name>
+    <value>134217728</value>
+    <description>
+      If the size of a mob file is less than this value, it's regarded as a small
+      file and needs to be merged. The default value is 128MB.
+    </description>
+  </property>
+  <property>
+    <name>hbase.mob.sweep.tool.compaction.memstore.flush.size</name>
+    <value>134217728</value>
+    <description>
+      The flush size for the memstore used by sweep job. Each sweep reducer owns such a memstore.
+      The default value is 128MB.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.mob.ttl.cleaner.period</name>
+    <value>86400000</value>
+    <description>
+      The period that ExpiredMobFileCleanerChore runs. The unit is millisecond.
+      The default value is one day.
+    </description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
new file mode 100644
index 0000000..98fe236
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired
+ * mob files.
+ */
+@InterfaceAudience.Private
+public class ExpiredMobFileCleanerChore extends Chore {
+
+  private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleanerChore.class);
+  private final HMaster master;
+  private ExpiredMobFileCleaner cleaner;
+
+  public ExpiredMobFileCleanerChore(HMaster master) {
+    super(master.getServerName() + "-ExpiredMobFileCleanerChore", master.getConfiguration().getInt(
+        MobConstants.MOB_CLEANER_PERIOD, MobConstants.DEFAULT_MOB_CLEANER_PERIOD), master);
+    this.master = master;
+    cleaner = new ExpiredMobFileCleaner();
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      TableDescriptors htds = master.getTableDescriptors();
+      Map<String, HTableDescriptor> map = htds.getAll();
+      for (HTableDescriptor htd : map.values()) {
+        for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
+          if (MobUtils.isMobFamily(hcd) && hcd.getMinVersions() == 0) {
+            cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Fail to clean the expired mob files", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 714b5a8..4ff3592 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -208,6 +208,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   CatalogJanitor catalogJanitorChore;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
+  private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
 
   MasterCoprocessorHost cpHost;
 
@@ -610,6 +611,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     // master initialization. See HBASE-5916.
     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
 
+    this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
+    Threads.setDaemonThreadRunning(expiredMobFileCleanerChore.getThread());
+
     if (this.cpHost != null) {
       // don't let cp initialization errors kill the master
       try {
@@ -856,6 +860,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   }
 
   private void stopChores() {
+    if (this.expiredMobFileCleanerChore != null) {
+      this.expiredMobFileCleanerChore.interrupt();
+    }
     if (this.balancerChore != null) {
       this.balancerChore.interrupt();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
index 5f13502..fd35a15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
@@ -152,7 +152,7 @@ public class DefaultMobCompactor extends DefaultCompactor {
             // to the store file.
             writer.append(kv);
           } else if (MobUtils.isMobReferenceCell(kv)) {
-            if (MobUtils.isValidMobRefCellValue(kv)) {
+            if (MobUtils.hasValidMobRefCellValue(kv)) {
               int size = MobUtils.getMobValueLength(kv);
               if (size > mobSizeThreshold) {
                 // If the value size is larger than the threshold, it's regarded as a mob. Since

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
new file mode 100644
index 0000000..d3c11ad
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
@@ -0,0 +1,120 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * The cleaner to delete the expired MOB files.
+ */
+@InterfaceAudience.Private
+public class ExpiredMobFileCleaner extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleaner.class);
+  /**
+   * Cleans the MOB files when they're expired and their min versions are 0.
+   * If the latest timestamp of Cells in a MOB file is older than the TTL in the column family,
+   * it's regarded as expired. This cleaner deletes them.
+   * At a time T0, the cells in a mob file M0 are expired. If a user starts a scan before T0, those
+   * mob cells are visible, this scan still runs after T0. At that time T1, this mob file M0
+   * is expired, meanwhile a cleaner starts, the M0 is archived and can be read in the archive
+   * directory.
+   * @param tableName The current table name.
+   * @param family The current family.
+   * @throws ServiceException
+   * @throws IOException
+   */
+  public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family)
+      throws ServiceException, IOException {
+    Configuration conf = getConf();
+    TableName tn = TableName.valueOf(tableName);
+    FileSystem fs = FileSystem.get(conf);
+    LOG.info("Cleaning the expired MOB files of " + family.getNameAsString() + " in " + tableName);
+    // disable the block cache.
+    Configuration copyOfConf = new Configuration(conf);
+    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
+    MobUtils.cleanExpiredMobFiles(fs, conf, tn, family, cacheConfig,
+        EnvironmentEdgeManager.currentTime());
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    ToolRunner.run(conf, new ExpiredMobFileCleaner(), args);
+  }
+
+  private void printUsage() {
+    System.err.println("Usage:\n" + "--------------------------\n"
+        + ExpiredMobFileCleaner.class.getName() + " tableName familyName");
+    System.err.println(" tableName        The table name");
+    System.err.println(" familyName       The column family name");
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      printUsage();
+      return 1;
+    }
+    String tableName = args[0];
+    String familyName = args[1];
+    TableName tn = TableName.valueOf(tableName);
+    HBaseAdmin.checkHBaseAvailable(getConf());
+    HBaseAdmin admin = new HBaseAdmin(getConf());
+    try {
+      HTableDescriptor htd = admin.getTableDescriptor(tn);
+      HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName));
+      if (family == null || !MobUtils.isMobFamily(family)) {
+        throw new IOException("Column family " + familyName + " is not a MOB column family");
+      }
+      if (family.getMinVersions() > 0) {
+        throw new IOException(
+            "The minVersions of the column family is not 0, could not be handled by this cleaner");
+      }
+      cleanExpiredMobFiles(tableName, family);
+      return 0;
+    } finally {
+      try {
+        admin.close();
+      } catch (IOException e) {
+        LOG.error("Fail to close the HBaseAdmin.", e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
index 9978afd..4e3e7c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -38,6 +38,7 @@ public class MobConstants {
 
   public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw";
   public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks";
+  public static final String MOB_SCAN_REF_ONLY = "hbase.mob.scan.ref.only";
 
   public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size";
   public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000;
@@ -46,6 +47,26 @@ public class MobConstants {
   public static final String MOB_REGION_NAME = ".mob";
   public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME);
 
+  public static final String MOB_CLEANER_PERIOD = "hbase.master.mob.ttl.cleaner.period";
+  public static final int DEFAULT_MOB_CLEANER_PERIOD = 24 * 60 * 60 * 1000; // one day
+
+  public static final String MOB_SWEEP_TOOL_COMPACTION_START_DATE =
+      "hbase.mob.sweep.tool.compaction.start.date";
+  public static final String MOB_SWEEP_TOOL_COMPACTION_RATIO =
+      "hbase.mob.sweep.tool.compaction.ratio";
+  public static final String MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE =
+      "hbase.mob.sweep.tool.compaction.mergeable.size";
+
+  public static final float DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO = 0.5f;
+  public static final long DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE = 128 * 1024 * 1024;
+
+  public static final String MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME = "mobcompaction";
+
+  public static final String MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE =
+      "hbase.mob.sweep.tool.compaction.memstore.flush.size";
+  public static final long DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE =
+      1024 * 1024 * 128; // 128M
+
   public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period";
   public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio";
   public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE,
@@ -55,6 +76,7 @@ public class MobConstants {
   public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
 
   public final static String TEMP_DIR_NAME = ".tmp";
+  public final static String EMPTY_STRING = "";
   private MobConstants() {
 
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index e52d336..e49d3ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -18,13 +18,22 @@
  */
 package org.apache.hadoop.hbase.mob;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.List;
+import java.util.UUID;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -34,7 +43,16 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 
@@ -44,6 +62,9 @@ import org.apache.hadoop.hbase.util.FSUtils;
 @InterfaceAudience.Private
 public class MobUtils {
 
+  private static final Log LOG = LogFactory.getLog(MobUtils.class);
+  private static final String COMPACTION_WORKING_DIR_NAME = "working";
+
   private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
       new ThreadLocal<SimpleDateFormat>() {
     @Override
@@ -143,6 +164,22 @@ public class MobUtils {
   }
 
   /**
+   * Indicates whether it's a reference only scan.
+   * The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
+   * If it's a ref only scan, only the cells with ref tag are returned.
+   * @param scan The current scan.
+   * @return True if it's a ref only scan.
+   */
+  public static boolean isRefOnlyScan(Scan scan) {
+    byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
+    try {
+      return refOnly != null && Bytes.toBoolean(refOnly);
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+  }
+
+  /**
    * Indicates whether the scan contains the information of caching blocks.
    * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
    * @param scan The current scan.
@@ -172,6 +209,91 @@ public class MobUtils {
   }
 
   /**
+   * Cleans the expired mob files.
+   * Cleans the files whose creation date is older than (current - columnFamily.ttl), and
+   * the minVersions of that column family is 0.
+   * @param fs The current file system.
+   * @param conf The current configuration.
+   * @param tableName The current table name.
+   * @param columnDescriptor The descriptor of the current column family.
+   * @param cacheConfig The cacheConfig that disables the block cache.
+   * @param current The current time.
+   * @throws IOException
+   */
+  public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
+      HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
+      throws IOException {
+    long timeToLive = columnDescriptor.getTimeToLive();
+    if (Integer.MAX_VALUE == timeToLive) {
+      // no need to clean, because the TTL is not set.
+      return;
+    }
+
+    Date expireDate = new Date(current - timeToLive * 1000);
+    expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
+    LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
+
+    FileStatus[] stats = null;
+    Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName);
+    Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
+    try {
+      stats = fs.listStatus(path);
+    } catch (FileNotFoundException e) {
+      LOG.warn("Fail to find the mob file " + path, e);
+    }
+    if (null == stats) {
+      // no file found
+      return;
+    }
+    List<StoreFile> filesToClean = new ArrayList<StoreFile>();
+    int deletedFileCount = 0;
+    for (FileStatus file : stats) {
+      String fileName = file.getPath().getName();
+      try {
+        MobFileName mobFileName = null;
+        if (!HFileLink.isHFileLink(file.getPath())) {
+          mobFileName = MobFileName.create(fileName);
+        } else {
+          HFileLink hfileLink = new HFileLink(conf, file.getPath());
+          mobFileName = MobFileName.create(hfileLink.getOriginPath().getName());
+        }
+        Date fileDate = parseDate(mobFileName.getDate());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Checking file " + fileName);
+        }
+        if (fileDate.getTime() < expireDate.getTime()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(fileName + " is an expired file");
+          }
+          filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
+        }
+      } catch (Exception e) {
+        LOG.error("Cannot parse the fileName " + fileName, e);
+      }
+    }
+    if (!filesToClean.isEmpty()) {
+      try {
+        removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
+            filesToClean);
+        deletedFileCount = filesToClean.size();
+      } catch (IOException e) {
+        LOG.error("Fail to delete the mob files " + filesToClean, e);
+      }
+    }
+    LOG.info(deletedFileCount + " expired mob files are deleted");
+  }
+
+  /**
+   * Gets the znode name of column family.
+   * @param tableName The current table name.
+   * @param familyName The name of the current column family.
+   * @return The znode name of column family.
+   */
+  public static String getColumnFamilyZNodeName(String tableName, String familyName) {
+    return tableName + ":" + familyName;
+  }
+
+  /**
    * Gets the root dir of the mob files.
    * It's {HBASE_DIR}/mobdir.
    * @param conf The current configuration.
@@ -183,6 +305,19 @@ public class MobUtils {
   }
 
   /**
+   * Gets the qualified root dir of the mob files.
+   * @param conf The current configuration.
+   * @return The qualified root dir.
+   * @throws IOException
+   */
+  public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
+    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
+    Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
+    FileSystem fs = mobRootDir.getFileSystem(conf);
+    return mobRootDir.makeQualified(fs);
+  }
+
+  /**
    * Gets the region dir of the mob files.
    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
    * @param conf The current configuration.
@@ -190,7 +325,7 @@ public class MobUtils {
    * @return The region dir of the mob files.
    */
   public static Path getMobRegionPath(Configuration conf, TableName tableName) {
-    Path tablePath = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
+    Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName);
     HRegionInfo regionInfo = getMobRegionInfo(tableName);
     return new Path(tablePath, regionInfo.getEncodedName());
   }
@@ -232,36 +367,160 @@ public class MobUtils {
   }
 
   /**
+   * Gets whether the current HRegionInfo is a mob one.
+   * @param regionInfo The current HRegionInfo.
+   * @return If true, the current HRegionInfo is a mob one.
+   */
+  public static boolean isMobRegionInfo(HRegionInfo regionInfo) {
+    return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
+        .equals(regionInfo.getEncodedName());
+  }
+
+  /**
+   * Gets the working directory of the mob compaction.
+   * @param root The root directory of the mob compaction.
+   * @param jobName The current job name.
+   * @return The directory of the mob compaction for the current job.
+   */
+  public static Path getCompactionWorkingPath(Path root, String jobName) {
+    Path parent = new Path(root, jobName);
+    return new Path(parent, COMPACTION_WORKING_DIR_NAME);
+  }
+
+  /**
+   * Archives the mob files.
+   * @param conf The current configuration.
+   * @param fs The current file system.
+   * @param tableName The table name.
+   * @param tableDir The table directory.
+   * @param family The name of the column family.
+   * @param storeFiles The files to be deleted.
+   * @throws IOException
+   */
+  public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
+      Path tableDir, byte[] family, Collection<StoreFile> storeFiles) throws IOException {
+    HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
+        storeFiles);
+  }
+
+  /**
    * Creates a mob reference KeyValue.
    * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
-   * @param kv The original KeyValue.
+   * @param cell The original Cell.
    * @param fileName The mob file name where the mob reference KeyValue is written.
    * @param tableNameTag The tag of the current table name. It's very important in
    *                        cloning the snapshot.
    * @return The mob reference KeyValue.
    */
-  public static KeyValue createMobRefKeyValue(KeyValue kv, byte[] fileName, Tag tableNameTag) {
+  public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) {
     // Append the tags to the KeyValue.
     // The key is same, the value is the filename of the mob file
-    List<Tag> existingTags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
-    existingTags.add(MobConstants.MOB_REF_TAG);
+    List<Tag> tags = new ArrayList<Tag>();
+    // Add the ref tag as the 1st one.
+    tags.add(MobConstants.MOB_REF_TAG);
+    // Add the existing tags.
+    tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
     // Add the tag of the source table name, this table is where this mob file is flushed
     // from.
     // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
     // find the original mob files by this table name. For details please see cloning
     // snapshot for mob files.
-    existingTags.add(tableNameTag);
-    int valueLength = kv.getValueLength();
+    tags.add(tableNameTag);
+    int valueLength = cell.getValueLength();
     byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
-    KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
-        kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
-        kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put,
-        refValue, 0, refValue.length, existingTags);
-    reference.setSequenceId(kv.getSequenceId());
+    KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+        cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+        cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags);
+    reference.setSequenceId(cell.getSequenceId());
     return reference;
   }
 
   /**
+   * Creates a directory of mob files for flushing.
+   * @param conf The current configuration.
+   * @param fs The current file system.
+   * @param family The descriptor of the current column family.
+   * @param date The date string, its format is yyyymmmdd.
+   * @param basePath The basic path for a temp directory.
+   * @param maxKeyCount The key count.
+   * @param compression The compression algorithm.
+   * @param startKey The hex string of the start key.
+   * @param cacheConfig The current cache config.
+   * @return The writer for the mob file.
+   * @throws IOException
+   */
+  public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
+      HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
+      Compression.Algorithm compression, String startKey, CacheConfig cacheConfig)
+      throws IOException {
+    MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
+        .replaceAll("-", ""));
+    HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+        .withIncludesMvcc(false).withIncludesTags(true)
+        .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
+        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
+        .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
+        .withDataBlockEncoding(family.getDataBlockEncoding()).build();
+
+    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
+        .withFilePath(new Path(basePath, mobFileName.getFileName()))
+        .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
+        .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+    return w;
+  }
+
+  /**
+   * Commits the mob file.
+   * @param @param conf The current configuration.
+   * @param fs The current file system.
+   * @param path The path where the mob file is saved.
+   * @param targetPath The directory path where the source file is renamed to.
+   * @param cacheConfig The current cache config.
+   * @throws IOException
+   */
+  public static void commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
+      Path targetPath, CacheConfig cacheConfig) throws IOException {
+    if (sourceFile == null) {
+      return;
+    }
+    Path dstPath = new Path(targetPath, sourceFile.getName());
+    validateMobFile(conf, fs, sourceFile, cacheConfig);
+    String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
+    LOG.info(msg);
+    Path parent = dstPath.getParent();
+    if (!fs.exists(parent)) {
+      fs.mkdirs(parent);
+    }
+    if (!fs.rename(sourceFile, dstPath)) {
+      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
+    }
+  }
+
+  /**
+   * Validates a mob file by opening and closing it.
+   * @param conf The current configuration.
+   * @param fs The current file system.
+   * @param path The path where the mob file is saved.
+   * @param cacheConfig The current cache config.
+   */
+  private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
+      CacheConfig cacheConfig) throws IOException {
+    StoreFile storeFile = null;
+    try {
+      storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
+      storeFile.createReader();
+    } catch (IOException e) {
+      LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
+      throw e;
+    } finally {
+      if (storeFile != null) {
+        storeFile.closeReader(false);
+      }
+    }
+  }
+
+  /**
    * Indicates whether the current mob ref cell has a valid value.
    * A mob ref cell has a mob reference tag.
    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
@@ -270,7 +529,7 @@ public class MobUtils {
    * @param cell The mob ref cell.
    * @return True if the cell has a valid value.
    */
-  public static boolean isValidMobRefCellValue(Cell cell) {
+  public static boolean hasValidMobRefCellValue(Cell cell) {
     return cell.getValueLength() > Bytes.SIZEOF_INT;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
new file mode 100644
index 0000000..a9557d7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The zookeeper used for MOB.
+ * This zookeeper is used to synchronize the HBase major compaction and sweep tool.
+ * The structure of the nodes for mob in zookeeper.
+ * |--baseNode
+ *     |--MOB
+ *         |--tableName:columnFamilyName-lock // locks for the mob column family
+ *         |--tableName:columnFamilyName-sweeper // when a sweep tool runs, such a node is added
+ *         |--tableName:columnFamilyName-majorCompaction
+ *              |--UUID //when a major compaction occurs, such a node is added.
+ * In order to synchronize the operations between the sweep tool and HBase major compaction, these
+ * actions need to acquire the tableName:columnFamilyName-lock before the sweep tool and major
+ * compaction run.
+ * In sweep tool.
+ * 1. If it acquires the lock successfully. It check whether the sweeper node exists, if exist the
+ * current running is aborted. If not it it checks whether there're major compaction nodes, if yes
+ * the current running is aborted, if not it adds a sweep node to the zookeeper.
+ * 2. If it could not obtain the lock, the current running is aborted.
+ * In the HBase compaction.
+ * 1. If it's a minor compaction, continue the compaction.
+ * 2. If it's a major compaction, it acquires a lock in zookeeper.
+ *    A. If it obtains the lock, it checks whether there's sweep node, if yes it converts itself
+ *    to a minor one and continue, if no it adds a major compaction node to the zookeeper.
+ *    B. If it could not obtain the lock, it converts itself to a minor one and continue the
+ *    compaction.
+ */
+@InterfaceAudience.Private
+public class MobZookeeper {
+  // TODO Will remove this class before the mob is merged back to master.
+  private static final Log LOG = LogFactory.getLog(MobZookeeper.class);
+
+  private ZooKeeperWatcher zkw;
+  private String mobZnode;
+  private static final String LOCK_EPHEMERAL = "-lock";
+  private static final String SWEEPER_EPHEMERAL = "-sweeper";
+  private static final String MAJOR_COMPACTION_EPHEMERAL = "-majorCompaction";
+
+  private MobZookeeper(Configuration conf, String identifier) throws IOException,
+      KeeperException {
+    this.zkw = new ZooKeeperWatcher(conf, identifier, new DummyMobAbortable());
+    mobZnode = ZKUtil.joinZNode(zkw.baseZNode, "MOB");
+    if (ZKUtil.checkExists(zkw, mobZnode) == -1) {
+      ZKUtil.createWithParents(zkw, mobZnode);
+    }
+  }
+
+  /**
+   * Creates an new instance of MobZookeeper.
+   * @param conf The current configuration.
+   * @param identifier string that is passed to RecoverableZookeeper to be used as
+   * identifier for this instance.
+   * @return A new instance of MobZookeeper.
+   * @throws IOException
+   * @throws KeeperException
+   */
+  public static MobZookeeper newInstance(Configuration conf, String identifier) throws IOException,
+      KeeperException {
+    return new MobZookeeper(conf, identifier);
+  }
+
+  /**
+   * Acquire a lock on the current column family.
+   * All the threads try to access the column family acquire a lock which is actually create an
+   * ephemeral node in the zookeeper.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   * @return True if the lock is obtained successfully. Otherwise false is returned.
+   */
+  public boolean lockColumnFamily(String tableName, String familyName) {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    boolean locked = false;
+    try {
+      locked = ZKUtil.createEphemeralNodeAndWatch(zkw,
+          ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL), null);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(locked ? "Locked the column family " + znodeName
+            : "Can not lock the column family " + znodeName);
+      }
+    } catch (KeeperException e) {
+      LOG.error("Fail to lock the column family " + znodeName, e);
+    }
+    return locked;
+  }
+
+  /**
+   * Release the lock on the current column family.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   */
+  public void unlockColumnFamily(String tableName, String familyName) {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Unlocking the column family " + znodeName);
+    }
+    try {
+      ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL));
+    } catch (KeeperException e) {
+      LOG.warn("Fail to unlock the column family " + znodeName, e);
+    }
+  }
+
+  /**
+   * Adds a node to zookeeper which indicates that a sweep tool is running.
+   * @param tableName The current table name.
+   * @param familyName The current columnFamilyName name.
+   * @param data the data of the ephemeral node.
+   * @return True if the node is created successfully. Otherwise false is returned.
+   */
+  public boolean addSweeperZNode(String tableName, String familyName, byte[] data) {
+    boolean add = false;
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    try {
+      add = ZKUtil.createEphemeralNodeAndWatch(zkw,
+          ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL), data);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(add ? "Added a znode for sweeper " + znodeName
+            : "Cannot add a znode for sweeper " + znodeName);
+      }
+    } catch (KeeperException e) {
+      LOG.error("Fail to add a znode for sweeper " + znodeName, e);
+    }
+    return add;
+  }
+
+  /**
+   * Gets the path of the sweeper znode in zookeeper.
+   * @param tableName The current table name.
+   * @param familyName The current columnFamilyName name.
+   * @return The path of the sweeper znode in zookeper.
+   */
+  public String getSweeperZNodePath(String tableName, String familyName) {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    return ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL);
+  }
+
+  /**
+   * Deletes the node from zookeeper which indicates that a sweep tool is finished.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   */
+  public void deleteSweeperZNode(String tableName, String familyName) {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    try {
+      ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL));
+    } catch (KeeperException e) {
+      LOG.error("Fail to delete a znode for sweeper " + znodeName, e);
+    }
+  }
+
+  /**
+   * Checks whether the znode exists in the Zookeeper.
+   * If the node exists, it means a sweep tool is running.
+   * Otherwise, the sweep tool is not.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   * @return True if this node doesn't exist. Otherwise false is returned.
+   * @throws KeeperException
+   */
+  public boolean isSweeperZNodeExist(String tableName, String familyName) throws KeeperException {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    return ZKUtil.checkExists(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)) >= 0;
+  }
+
+  /**
+   * Checks whether there're major compactions nodes in the zookeeper.
+   * If there're such nodes, it means there're major compactions in progress now.
+   * Otherwise there're not.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   * @return True if there're major compactions in progress. Otherwise false is returned.
+   * @throws KeeperException
+   */
+  public boolean hasMajorCompactionChildren(String tableName, String familyName)
+      throws KeeperException {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+    List<String> children = ZKUtil.listChildrenNoWatch(zkw, mcPath);
+    return children != null && !children.isEmpty();
+  }
+
+  /**
+   * Creates a node of a major compaction to the Zookeeper.
+   * Before a HBase major compaction, such a node is created to the Zookeeper. It tells others that
+   * there're major compaction in progress, the sweep tool could not be run at this time.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   * @param compactionName The current compaction name.
+   * @return True if the node is created successfully. Otherwise false is returned.
+   * @throws KeeperException
+   */
+  public boolean addMajorCompactionZNode(String tableName, String familyName,
+      String compactionName) throws KeeperException {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+    ZKUtil.createNodeIfNotExistsAndWatch(zkw, mcPath, null);
+    String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
+    return ZKUtil.createEphemeralNodeAndWatch(zkw, eachMcPath, null);
+  }
+
+  /**
+   * Deletes a major compaction node from the Zookeeper.
+   * @param tableName The current table name.
+   * @param familyName The current column family name.
+   * @param compactionName The current compaction name.
+   * @throws KeeperException
+   */
+  public void deleteMajorCompactionZNode(String tableName, String familyName,
+      String compactionName) throws KeeperException {
+    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+    String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+    String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
+    ZKUtil.deleteNode(zkw, eachMcPath);
+  }
+
+  /**
+   * Closes the MobZookeeper.
+   */
+  public void close() {
+    this.zkw.close();
+  }
+
+  /**
+   * An dummy abortable. It's used for the MobZookeeper.
+   */
+  public static class DummyMobAbortable implements Abortable {
+
+    private boolean abort = false;
+
+    public void abort(String why, Throwable e) {
+      abort = true;
+    }
+
+    public boolean isAborted() {
+      return abort;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
new file mode 100644
index 0000000..b0d4c9d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
@@ -0,0 +1,184 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepReducer.SweepPartitionId;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MemStore;
+import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+
+/**
+ * The wrapper of a DefaultMemStore.
+ * This wrapper is used in the sweep reducer to buffer and sort the cells written from
+ * the invalid and small mob files.
+ * It's flushed when it's full, the mob data are written to the mob files, and their file names
+ * are written back to store files of HBase.
+ * This memStore is used to sort the cells in mob files.
+ * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date),
+ * in each group, the reducer iterates the files and read the cells to a new and bigger mob file.
+ * The cells in the same mob file are ordered, but cells across mob files are not.
+ * So we need this MemStoreWrapper to sort those cells come from different mob files before
+ * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file.
+ */
+@InterfaceAudience.Private
+public class MemStoreWrapper {
+
+  private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
+
+  private MemStore memstore;
+  private long flushSize;
+  private SweepPartitionId partitionId;
+  private Context context;
+  private Configuration conf;
+  private HTable table;
+  private HColumnDescriptor hcd;
+  private Path mobFamilyDir;
+  private FileSystem fs;
+  private CacheConfig cacheConfig;
+
+  public MemStoreWrapper(Context context, FileSystem fs, HTable table, HColumnDescriptor hcd,
+      MemStore memstore, CacheConfig cacheConfig) throws IOException {
+    this.memstore = memstore;
+    this.context = context;
+    this.fs = fs;
+    this.table = table;
+    this.hcd = hcd;
+    this.conf = context.getConfiguration();
+    this.cacheConfig = cacheConfig;
+    flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
+        MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
+    mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
+  }
+
+  public void setPartitionId(SweepPartitionId partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  /**
+   * Flushes the memstore if the size is large enough.
+   * @throws IOException
+   */
+  private void flushMemStoreIfNecessary() throws IOException {
+    if (memstore.heapSize() >= flushSize) {
+      flushMemStore();
+    }
+  }
+
+  /**
+   * Flushes the memstore anyway.
+   * @throws IOException
+   */
+  public void flushMemStore() throws IOException {
+    MemStoreSnapshot snapshot = memstore.snapshot();
+    internalFlushCache(snapshot);
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
+  /**
+   * Flushes the snapshot of the memstore.
+   * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase.
+   * @param snapshot The snapshot of the memstore.
+   * @throws IOException
+   */
+  private void internalFlushCache(final MemStoreSnapshot snapshot)
+      throws IOException {
+    if (snapshot.getSize() == 0) {
+      return;
+    }
+    // generate the files into a temp directory.
+    String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
+    StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd,
+        partitionId.getDate(), new Path(tempPathString), snapshot.getCellsCount(),
+        hcd.getCompactionCompression(), partitionId.getStartKey(), cacheConfig);
+
+    String relativePath = mobFileWriter.getPath().getName();
+    LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
+
+    byte[] referenceValue = Bytes.toBytes(relativePath);
+    int keyValueCount = 0;
+    KeyValueScanner scanner = snapshot.getScanner();
+    Cell cell = null;
+    while (null != (cell = scanner.next())) {
+      KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+      mobFileWriter.append(kv);
+      keyValueCount++;
+    }
+    scanner.close();
+    // Write out the log sequence number that corresponds to this output
+    // hfile. The hfile is current up to and including logCacheFlushId.
+    mobFileWriter.appendMetadata(Long.MAX_VALUE, false);
+    mobFileWriter.close();
+
+    MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
+    context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
+    // write reference/fileName back to the store files of HBase.
+    scanner = snapshot.getScanner();
+    scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
+    cell = null;
+    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, this.table.getTableName());
+    while (null != (cell = scanner.next())) {
+      KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag);
+      Put put =
+          new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
+      put.add(reference);
+      table.put(put);
+      context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1);
+    }
+    if (keyValueCount > 0) {
+      table.flushCommits();
+    }
+    scanner.close();
+  }
+
+  /**
+   * Adds a KeyValue into the memstore.
+   * @param kv The KeyValue to be added.
+   * @throws IOException
+   */
+  public void addToMemstore(KeyValue kv) throws IOException {
+    memstore.add(kv);
+    // flush the memstore if it's full.
+    flushMemStoreIfNecessary();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
new file mode 100644
index 0000000..bdec887
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * The partitioner for the sweep job.
+ * The key is a mob file name. We bucket by date.
+ */
+@InterfaceAudience.Private
+public class MobFilePathHashPartitioner extends Partitioner<Text, KeyValue> {
+
+  @Override
+  public int getPartition(Text fileName, KeyValue kv, int numPartitions) {
+    MobFileName mobFileName = MobFileName.create(fileName.toString());
+    String date = mobFileName.getDate();
+    int hash = date.hashCode();
+    return (hash & Integer.MAX_VALUE) % numPartitions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
new file mode 100644
index 0000000..1d048bb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
@@ -0,0 +1,550 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The sweep job.
+ * Run map reduce to merge the smaller mob files into bigger ones and cleans the unused ones.
+ */
+@InterfaceAudience.Private
+public class SweepJob {
+
+  private final FileSystem fs;
+  private final Configuration conf;
+  private static final Log LOG = LogFactory.getLog(SweepJob.class);
+  static final String SWEEP_JOB_ID = "mob.compaction.id";
+  static final String SWEEPER_NODE = "mob.compaction.sweep.node";
+  static final String WORKING_DIR_KEY = "mob.compaction.dir";
+  static final String WORKING_ALLNAMES_FILE_KEY = "mob.compaction.all.file";
+  static final String WORKING_VISITED_DIR_KEY = "mob.compaction.visited.dir";
+  static final String WORKING_ALLNAMES_DIR = "all";
+  static final String WORKING_VISITED_DIR = "visited";
+  public static final String WORKING_FILES_DIR_KEY = "mob.compaction.files.dir";
+  //the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing.
+  public static final String MOB_COMPACTION_DELAY = "hbase.mob.compaction.delay";
+  protected static long ONE_DAY = 24 * 60 * 60 * 1000;
+  private long compactionStartTime = EnvironmentEdgeManager.currentTime();
+  public final static String CREDENTIALS_LOCATION = "credentials_location";
+  private CacheConfig cacheConfig;
+  static final int SCAN_CACHING = 10000;
+
+  public SweepJob(Configuration conf, FileSystem fs) {
+    this.conf = conf;
+    this.fs = fs;
+    // disable the block cache.
+    Configuration copyOfConf = new Configuration(conf);
+    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+    cacheConfig = new CacheConfig(copyOfConf);
+  }
+
+  /**
+   * Runs MapReduce to do the sweeping on the mob files.
+   * There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob
+   * references from 'normal' regions' rows.
+   * The running of the sweep tool on the same column family are mutually exclusive.
+   * The HBase major compaction and running of the sweep tool on the same column family
+   * are mutually exclusive.
+   * The synchronization is done by the Zookeeper.
+   * So in the beginning of the running, we need to make sure only this sweep tool is the only one
+   * that is currently running in this column family, and in this column family there're no major
+   * compaction in progress.
+   * @param tn The current table name.
+   * @param family The descriptor of the current column family.
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public void sweep(TableName tn, HColumnDescriptor family) throws IOException,
+      ClassNotFoundException, InterruptedException, KeeperException {
+    Configuration conf = new Configuration(this.conf);
+    // check whether the current user is the same one with the owner of hbase root
+    String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName();
+    FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR)));
+    if (hbaseRootFileStat.length > 0) {
+      String owner = hbaseRootFileStat[0].getOwner();
+      if (!owner.equals(currentUserName)) {
+        String errorMsg = "The current user[" + currentUserName
+            + "] doesn't have hbase root credentials."
+            + " Please make sure the user is the root of the target HBase";
+        LOG.error(errorMsg);
+        throw new IOException(errorMsg);
+      }
+    } else {
+      LOG.error("The target HBase doesn't exist");
+      throw new IOException("The target HBase doesn't exist");
+    }
+    String familyName = family.getNameAsString();
+    String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
+    MobZookeeper zk = MobZookeeper.newInstance(conf, id);
+    try {
+      // Try to obtain the lock. Use this lock to synchronize all the query, creation/deletion
+      // in the Zookeeper.
+      if (!zk.lockColumnFamily(tn.getNameAsString(), familyName)) {
+        LOG.warn("Can not lock the store " + familyName
+            + ". The major compaction in HBase may be in-progress. Please re-run the job.");
+        return;
+      }
+      try {
+        // Checks whether there're HBase major compaction now.
+        boolean hasChildren = zk.hasMajorCompactionChildren(tn.getNameAsString(), familyName);
+        if (hasChildren) {
+          LOG.warn("The major compaction in HBase may be in-progress."
+              + " Please re-run the job.");
+          return;
+        } else {
+          // Checks whether there's sweep tool in progress.
+          boolean hasSweeper = zk.isSweeperZNodeExist(tn.getNameAsString(), familyName);
+          if (hasSweeper) {
+            LOG.warn("Another sweep job is running");
+            return;
+          } else {
+            // add the sweeper node, mark that there's one sweep tool in progress.
+            // All the HBase major compaction and sweep tool in this column family could not
+            // run until this sweep tool is finished.
+            zk.addSweeperZNode(tn.getNameAsString(), familyName, Bytes.toBytes(id));
+          }
+        }
+      } finally {
+        zk.unlockColumnFamily(tn.getNameAsString(), familyName);
+      }
+      Job job = null;
+      try {
+        Scan scan = new Scan();
+        scan.addFamily(family.getName());
+        // Do not retrieve the mob data when scanning
+        scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+        scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+        scan.setCaching(SCAN_CACHING);
+        scan.setCacheBlocks(false);
+        scan.setMaxVersions(family.getMaxVersions());
+        conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+            JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
+        conf.set(SWEEP_JOB_ID, id);
+        conf.set(SWEEPER_NODE, zk.getSweeperZNodePath(tn.getNameAsString(), familyName));
+        job = prepareJob(tn, familyName, scan, conf);
+        job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);
+        // Record the compaction start time.
+        // In the sweep tool, only the mob file whose modification time is older than
+        // (startTime - delay) could be handled by this tool.
+        // The delay is one day. It could be configured as well, but this is only used
+        // in the test.
+        job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
+            compactionStartTime);
+
+        job.setPartitionerClass(MobFilePathHashPartitioner.class);
+        submit(job, tn, familyName);
+        if (job.waitForCompletion(true)) {
+          // Archive the unused mob files.
+          removeUnusedFiles(job, tn, family);
+        }
+      } finally {
+        cleanup(job, tn, familyName);
+        zk.deleteSweeperZNode(tn.getNameAsString(), familyName);
+      }
+    } finally {
+      zk.close();
+    }
+  }
+
+  /**
+   * Prepares a map reduce job.
+   * @param tn The current table name.
+   * @param familyName The current family name.
+   * @param scan The current scan.
+   * @param conf The current configuration.
+   * @return A map reduce job.
+   * @throws IOException
+   */
+  private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf)
+      throws IOException {
+    Job job = Job.getInstance(conf);
+    job.setJarByClass(SweepMapper.class);
+    TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan,
+        SweepMapper.class, Text.class, Writable.class, job);
+
+    job.setInputFormatClass(TableInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(KeyValue.class);
+    job.setReducerClass(SweepReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    String jobName = getCustomJobName(this.getClass().getSimpleName(), tn.getNameAsString(),
+        familyName);
+    job.setJobName(jobName);
+    if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
+      String fileLoc = conf.get(CREDENTIALS_LOCATION);
+      Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
+      job.getCredentials().addAll(cred);
+    }
+    return job;
+  }
+
+  /**
+   * Gets a customized job name.
+   * It's className-mapperClassName-reducerClassName-tableName-familyName.
+   * @param className The current class name.
+   * @param tableName The current table name.
+   * @param familyName The current family name.
+   * @return The customized job name.
+   */
+  private static String getCustomJobName(String className, String tableName, String familyName) {
+    StringBuilder name = new StringBuilder();
+    name.append(className);
+    name.append('-').append(SweepMapper.class.getSimpleName());
+    name.append('-').append(SweepReducer.class.getSimpleName());
+    name.append('-').append(tableName);
+    name.append('-').append(familyName);
+    return name.toString();
+  }
+
+  /**
+   * Submits a job.
+   * @param job The current job.
+   * @param tn The current table name.
+   * @param familyName The current family name.
+   * @throws IOException
+   */
+  private void submit(Job job, TableName tn, String familyName) throws IOException {
+    // delete the temp directory of the mob files in case the failure in the previous
+    // execution.
+    Path tempDir =
+        new Path(MobUtils.getMobHome(job.getConfiguration()), MobConstants.TEMP_DIR_NAME);
+    Path mobCompactionTempDir =
+        new Path(tempDir, MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME);
+    Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName());
+    job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString());
+    // delete the working directory in case it'not deleted by the last running.
+    fs.delete(workingPath, true);
+    // create the working directory.
+    fs.mkdirs(workingPath);
+    // create a sequence file which contains the names of all the existing files.
+    Path workingPathOfFiles = new Path(workingPath, "files");
+    Path workingPathOfNames = new Path(workingPath, "names");
+    job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString());
+    Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR);
+    job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString());
+    Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR);
+    job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString());
+    // create a file includes all the existing mob files whose creation time is older than
+    // (now - oneDay)
+    fs.create(allFileNamesPath, true);
+    // create a directory where the files contain names of visited mob files are saved.
+    fs.mkdirs(vistiedFileNamesPath);
+    Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName);
+    // Find all the files whose creation time are older than one day.
+    // Write those file names to a file.
+    // In each reducer there's a writer, it write the visited file names to a file which is saved
+    // in WORKING_VISITED_DIR.
+    // After the job is finished, compare those files, then find out the unused mob files and
+    // archive them.
+    FileStatus[] files = fs.listStatus(mobStorePath);
+    Set<String> fileNames = new TreeSet<String>();
+    long mobCompactionDelay = job.getConfiguration().getLong(MOB_COMPACTION_DELAY, ONE_DAY);
+    for (FileStatus fileStatus : files) {
+      if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) {
+        if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) {
+          // only record the potentially unused files older than one day.
+          fileNames.add(fileStatus.getPath().getName());
+        }
+      }
+    }
+    // write the names to a sequence file
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, job.getConfiguration(),
+        allFileNamesPath, String.class, String.class);
+    try {
+      for (String fileName : fileNames) {
+        writer.append(fileName, MobConstants.EMPTY_STRING);
+      }
+    } finally {
+      IOUtils.closeStream(writer);
+    }
+  }
+
+  /**
+   * Gets the unused mob files.
+   * Compare the file which contains all the existing mob files and the visited files,
+   * find out the unused mob file and archive them.
+   * @param conf The current configuration.
+   * @return The unused mob files.
+   * @throws IOException
+   */
+  List<String> getUnusedFiles(Configuration conf) throws IOException {
+    // find out the unused files and archive them
+    Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY));
+    SequenceFile.Reader allNamesReader = null;
+    MergeSortReader visitedNamesReader = null;
+    List<String> toBeArchived = new ArrayList<String>();
+    try {
+      allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf);
+      visitedNamesReader = new MergeSortReader(fs, conf,
+          new Path(conf.get(WORKING_VISITED_DIR_KEY)));
+      String nextAll = (String) allNamesReader.next((String) null);
+      String nextVisited = visitedNamesReader.next();
+      do {
+        if (nextAll != null) {
+          if (nextVisited != null) {
+            int compare = nextAll.compareTo(nextVisited);
+            if (compare < 0) {
+              toBeArchived.add(nextAll);
+              nextAll = (String) allNamesReader.next((String) null);
+            } else if (compare > 0) {
+              nextVisited = visitedNamesReader.next();
+            } else {
+              nextAll = (String) allNamesReader.next((String) null);
+              nextVisited = visitedNamesReader.next();
+            }
+          } else {
+            toBeArchived.add(nextAll);
+            nextAll = (String) allNamesReader.next((String) null);
+          }
+        } else {
+          break;
+        }
+      } while (nextAll != null || nextVisited != null);
+    } finally {
+      if (allNamesReader != null) {
+        allNamesReader.close();
+      }
+      if (visitedNamesReader != null) {
+        visitedNamesReader.close();
+      }
+    }
+    return toBeArchived;
+  }
+
+  /**
+   * Archives unused mob files.
+   * @param job The current job.
+   * @param tn The current table name.
+   * @param hcd The descriptor of the current column family.
+   * @throws IOException
+   */
+  private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException {
+    // find out the unused files and archive them
+    List<StoreFile> storeFiles = new ArrayList<StoreFile>();
+    List<String> toBeArchived = getUnusedFiles(job.getConfiguration());
+    // archive them
+    Path mobStorePath = MobUtils
+        .getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString());
+    for (String archiveFileName : toBeArchived) {
+      Path path = new Path(mobStorePath, archiveFileName);
+      storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), cacheConfig, BloomType.NONE));
+    }
+    if (!storeFiles.isEmpty()) {
+      try {
+        MobUtils.removeMobFiles(job.getConfiguration(), fs, tn,
+            FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles);
+        LOG.info(storeFiles.size() + " unused MOB files are removed");
+      } catch (Exception e) {
+        LOG.error("Fail to archive the store files " + storeFiles, e);
+      }
+    }
+  }
+
+  /**
+   * Deletes the working directory.
+   * @param job The current job.
+   * @param store The current MobFileStore.
+   * @throws IOException
+   */
+  private void cleanup(Job job, TableName tn, String familyName) throws IOException {
+    if (job != null) {
+      // delete the working directory
+      Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY));
+      try {
+        fs.delete(workingPath, true);
+      } catch (IOException e) {
+        LOG.warn("Fail to delete the working directory after sweeping store " + familyName
+            + " in the table " + tn.getNameAsString(), e);
+      }
+    }
+  }
+
+  /**
+   * A result with index.
+   */
+  private class IndexedResult implements Comparable<IndexedResult> {
+    private int index;
+    private String value;
+
+    public IndexedResult(int index, String value) {
+      this.index = index;
+      this.value = value;
+    }
+
+    public int getIndex() {
+      return this.index;
+    }
+
+    public String getValue() {
+      return this.value;
+    }
+
+    @Override
+    public int compareTo(IndexedResult o) {
+      if (this.value == null) {
+        return 0;
+      } else if (o.value == null) {
+        return 1;
+      } else {
+        return this.value.compareTo(o.value);
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof IndexedResult)) {
+        return false;
+      }
+      return compareTo((IndexedResult) obj) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+      return value.hashCode();
+    }
+  }
+
+  /**
+   * Merge sort reader.
+   * It merges and sort the readers in different sequence files as one where
+   * the results are read in order.
+   */
+  private class MergeSortReader {
+
+    private List<SequenceFile.Reader> readers = new ArrayList<SequenceFile.Reader>();
+    private PriorityQueue<IndexedResult> results = new PriorityQueue<IndexedResult>();
+
+    public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException {
+      if (fs.exists(path)) {
+        FileStatus[] files = fs.listStatus(path);
+        int index = 0;
+        for (FileStatus file : files) {
+          if (file.isFile()) {
+            SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
+            String key = (String) reader.next((String) null);
+            if (key != null) {
+              results.add(new IndexedResult(index, key));
+              readers.add(reader);
+              index++;
+            }
+          }
+        }
+      }
+    }
+
+    public String next() throws IOException {
+      IndexedResult result = results.poll();
+      if (result != null) {
+        SequenceFile.Reader reader = readers.get(result.getIndex());
+        String key = (String) reader.next((String) null);
+        if (key != null) {
+          results.add(new IndexedResult(result.getIndex(), key));
+        }
+        return result.getValue();
+      }
+      return null;
+    }
+
+    public void close() {
+      for (SequenceFile.Reader reader : readers) {
+        IOUtils.closeStream(reader);
+      }
+    }
+  }
+
+  /**
+   * The counter used in sweep job.
+   */
+  public enum SweepCounter {
+
+    /**
+     * How many files are read.
+     */
+    INPUT_FILE_COUNT,
+
+    /**
+     * How many files need to be merged or cleaned.
+     */
+    FILE_TO_BE_MERGE_OR_CLEAN,
+
+    /**
+     * How many files are left after merging.
+     */
+    FILE_AFTER_MERGE_OR_CLEAN,
+
+    /**
+     * How many records are updated.
+     */
+    RECORDS_UPDATED,
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
new file mode 100644
index 0000000..b789332
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracker on the sweep tool node in zookeeper.
+ * The sweep tool node is an ephemeral one, when the process dies this node is deleted,
+ * at that time MR might be still running, and if another sweep job is started, two MR
+ * for the same column family will run at the same time.
+ * This tracker watches this ephemeral node, if it's gone or it's not created by the
+ * sweep job that owns the current MR, the current process will be aborted.
+ */
+@InterfaceAudience.Private
+public class SweepJobNodeTracker extends ZooKeeperListener {
+
+  private String node;
+  private String sweepJobId;
+
+  public SweepJobNodeTracker(ZooKeeperWatcher watcher, String node, String sweepJobId) {
+    super(watcher);
+    this.node = node;
+    this.sweepJobId = sweepJobId;
+  }
+
+  /**
+   * Registers the watcher on the sweep job node.
+   * If there's no such a sweep job node, or it's not created by the sweep job that
+   * owns the current MR, the current process will be aborted.
+   */
+  public void start() throws KeeperException {
+    watcher.registerListener(this);
+    if (ZKUtil.watchAndCheckExists(watcher, node)) {
+      byte[] data = ZKUtil.getDataAndWatch(watcher, node);
+      if (data != null) {
+        if (!sweepJobId.equals(Bytes.toString(data))) {
+          System.exit(1);
+        }
+      }
+    } else {
+      System.exit(1);
+    }
+  }
+
+  @Override
+  public void nodeDeleted(String path) {
+    // If the ephemeral node is deleted, abort the current process.
+    if (node.equals(path)) {
+      System.exit(1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
new file mode 100644
index 0000000..f508b93
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The mapper of a sweep job.
+ * Takes the rows from the table and their results and map to <filename:Text, mobValue:KeyValue>
+ * where mobValue is the actual cell in HBase.
+ */
+@InterfaceAudience.Private
+public class SweepMapper extends TableMapper<Text, KeyValue> {
+
+  private ZooKeeperWatcher zkw = null;
+
+  @Override
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
+    String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
+    String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
+    zkw = new ZooKeeperWatcher(context.getConfiguration(), id,
+        new DummyMobAbortable());
+    try {
+      SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, id);
+      tracker.start();
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+    if (zkw != null) {
+      zkw.close();
+    }
+  }
+
+  @Override
+  public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException,
+      InterruptedException {
+    if (columns == null) {
+      return;
+    }
+    KeyValue[] kvList = columns.raw();
+    if (kvList == null || kvList.length == 0) {
+      return;
+    }
+    for (KeyValue kv : kvList) {
+      if (MobUtils.hasValidMobRefCellValue(kv)) {
+        String fileName = MobUtils.getMobFileName(kv);
+        context.write(new Text(fileName), kv);
+      }
+    }
+  }
+}