You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/02/22 21:56:18 UTC

[45/50] [abbrv] hbase git commit: Merge branch 'master' (2/11/15) into hbase-11339

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
index 9a8b7d9,0000000..4bf1623
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
@@@ -1,652 -1,0 +1,652 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.filecompactions;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Random;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.RejectedExecutionException;
 +import java.util.concurrent.RejectedExecutionHandler;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.LargeTests;
++import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.client.Admin;
 +import org.apache.hadoop.hbase.client.Delete;
 +import org.apache.hadoop.hbase.client.Durability;
 +import org.apache.hadoop.hbase.client.HTable;
 +import org.apache.hadoop.hbase.client.Put;
 +import org.apache.hadoop.hbase.client.Result;
 +import org.apache.hadoop.hbase.client.ResultScanner;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.HFileLink;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.regionserver.HRegion;
 +import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.Threads;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(LargeTests.class)
 +public class TestMobFileCompactor {
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private Configuration conf = null;
 +  private String tableNameAsString;
 +  private TableName tableName;
 +  private static HTable hTable;
 +  private static Admin admin;
 +  private static HTableDescriptor desc;
 +  private static HColumnDescriptor hcd1;
 +  private static HColumnDescriptor hcd2;
 +  private static FileSystem fs;
 +  private final static String family1 = "family1";
 +  private final static String family2 = "family2";
 +  private final static String qf1 = "qualifier1";
 +  private final static String qf2 = "qualifier2";
 +  private static byte[] KEYS = Bytes.toBytes("012");
 +  private static int regionNum = KEYS.length;
 +  private static int delRowNum = 1;
 +  private static int delCellNum = 6;
 +  private static int cellNumPerRow = 3;
 +  private static int rowNumPerFile = 2;
 +  private static ExecutorService pool;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.startMiniCluster(1);
 +    pool = createThreadPool(TEST_UTIL.getConfiguration());
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    pool.shutdown();
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  @Before
 +  public void setUp() throws Exception {
 +    fs = TEST_UTIL.getTestFileSystem();
 +    conf = TEST_UTIL.getConfiguration();
 +    long tid = System.currentTimeMillis();
 +    tableNameAsString = "testMob" + tid;
 +    tableName = TableName.valueOf(tableNameAsString);
 +    hcd1 = new HColumnDescriptor(family1);
 +    hcd1.setMobEnabled(true);
 +    hcd1.setMobThreshold(0L);
 +    hcd1.setMaxVersions(4);
 +    hcd2 = new HColumnDescriptor(family2);
 +    hcd2.setMobEnabled(true);
 +    hcd2.setMobThreshold(0L);
 +    hcd2.setMaxVersions(4);
 +    desc = new HTableDescriptor(tableName);
 +    desc.addFamily(hcd1);
 +    desc.addFamily(hcd2);
 +    admin = TEST_UTIL.getHBaseAdmin();
 +    admin.createTable(desc, getSplitKeys());
 +    hTable = new HTable(conf, tableNameAsString);
 +    hTable.setAutoFlush(false, false);
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    admin.disableTable(tableName);
 +    admin.deleteTable(tableName);
 +    admin.close();
 +    hTable.close();
 +    fs.delete(TEST_UTIL.getDataTestDir(), true);
 +  }
 +
 +  @Test
 +  public void testCompactionWithoutDelFiles() throws Exception {
 +    resetConf();
 +    int count = 4;
 +    // generate mob files
 +    loadData(count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob file count", regionNum*count, countFiles(true, family1));
 +    assertEquals("Before compaction: del file count", 0, countFiles(false, family1));
 +
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("After compaction: mob file count", regionNum, countFiles(true, family1));
 +    assertEquals("After compaction: del file count", 0, countFiles(false, family1));
 +  }
 +
 +  @Test
 +  public void testCompactionWithDelFiles() throws Exception {
 +    resetConf();
 +    int count = 4;
 +    // generate mob files
 +    loadData(count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
 +        countMobCells(hTable));
 +    assertEquals("Before deleting: family1 mob file count", regionNum*count,
 +        countFiles(true, family1));
 +    assertEquals("Before deleting: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +
 +    createDelFile();
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("Before compaction: family1 mob file count", regionNum*count,
 +        countFiles(true, family1));
 +    assertEquals("Before compaction: family2 file count", regionNum*count,
 +        countFiles(true, family2));
 +    assertEquals("Before compaction: family1 del file count", regionNum,
 +        countFiles(false, family1));
 +    assertEquals("Before compaction: family2 del file count", regionNum,
 +        countFiles(false, family2));
 +
 +    // do the mob file compaction
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("After compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("After compaction: family1 mob file count", regionNum,
 +        countFiles(true, family1));
 +    assertEquals("After compaction: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +    assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1));
 +    assertEquals("After compaction: family2 del file count", regionNum,
 +        countFiles(false, family2));
 +    assertRefFileNameEqual(family1);
 +  }
 +
 +  private void assertRefFileNameEqual(String familyName) throws IOException {
 +    Scan scan = new Scan();
 +    scan.addFamily(Bytes.toBytes(familyName));
 +    // Do not retrieve the mob data when scanning
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner results = hTable.getScanner(scan);
 +    Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
 +        tableName), familyName);
 +    List<Path> actualFilePaths = new ArrayList<>();
 +    List<Path> expectFilePaths = new ArrayList<>();
 +    for (Result res : results) {
 +      for (Cell cell : res.listCells()) {
 +        byte[] referenceValue = CellUtil.cloneValue(cell);
 +        String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
 +            referenceValue.length - Bytes.SIZEOF_INT);
 +        Path targetPath = new Path(mobFamilyPath, fileName);
 +        if(!actualFilePaths.contains(targetPath)) {
 +          actualFilePaths.add(targetPath);
 +        }
 +      }
 +    }
 +    results.close();
 +    if (fs.exists(mobFamilyPath)) {
 +      FileStatus[] files = fs.listStatus(mobFamilyPath);
 +      for (FileStatus file : files) {
 +        if (!StoreFileInfo.isDelFile(file.getPath())) {
 +          expectFilePaths.add(file.getPath());
 +        }
 +      }
 +    }
 +    Collections.sort(actualFilePaths);
 +    Collections.sort(expectFilePaths);
 +    assertEquals(expectFilePaths, actualFilePaths);
 +  }
 +
 +  @Test
 +  public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception {
 +    resetConf();
 +    int mergeSize = 5000;
 +    // change the mob compaction merge size
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
 +
 +    int count = 4;
 +    // generate mob files
 +    loadData(count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
 +        countMobCells(hTable));
 +    assertEquals("Before deleting: mob file count", regionNum*count, countFiles(true, family1));
 +
 +    int largeFilesCount = countLargeFiles(mergeSize, family1);
 +    createDelFile();
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("Before compaction: family1 mob file count", regionNum*count,
 +        countFiles(true, family1));
 +    assertEquals("Before compaction: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +    assertEquals("Before compaction: family1 del file count", regionNum,
 +        countFiles(false, family1));
 +    assertEquals("Before compaction: family2 del file count", regionNum,
 +        countFiles(false, family2));
 +
 +    // do the mob file compaction
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("After compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    // After the compaction, the files smaller than the mob compaction merge size
 +    // is merge to one file
 +    assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum,
 +        countFiles(true, family1));
 +    assertEquals("After compaction: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +    assertEquals("After compaction: family1 del file count", regionNum,
 +        countFiles(false, family1));
 +    assertEquals("After compaction: family2 del file count", regionNum,
 +        countFiles(false, family2));
 +  }
 +
 +  @Test
 +  public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() throws Exception {
 +    resetConf();
 +    int batchSize = 2;
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, batchSize);
 +    int count = 4;
 +    // generate mob files
 +    loadData(count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    assertEquals("Before deleting: mob row count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("Before deleting: family1 mob file count", regionNum*count,
 +        countFiles(true, family1));
 +    assertEquals("Before deleting: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +
 +    createDelFile();
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("Before compaction: family1 mob file count", regionNum*count,
 +        countFiles(true, family1));
 +    assertEquals("Before compaction: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +    assertEquals("Before compaction: family1 del file count", regionNum,
 +        countFiles(false, family1));
 +    assertEquals("Before compaction: family2 del file count", regionNum,
 +        countFiles(false, family2));
 +
 +    // do the mob file compaction
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("After compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("After compaction: family1 mob file count", regionNum*(count/batchSize),
 +        countFiles(true, family1));
 +    assertEquals("After compaction: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +    assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1));
 +    assertEquals("After compaction: family2 del file count", regionNum,
 +        countFiles(false, family2));
 +  }
 +
 +  @Test
 +  public void testCompactionWithHFileLink() throws IOException, InterruptedException {
 +    resetConf();
 +    int count = 4;
 +    // generate mob files
 +    loadData(count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    long tid = System.currentTimeMillis();
 +    byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid);
 +    // take a snapshot
 +    admin.snapshot(snapshotName1, tableName);
 +
 +    createDelFile();
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("Before compaction: family1 mob file count", regionNum*count,
 +        countFiles(true, family1));
 +    assertEquals("Before compaction: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +    assertEquals("Before compaction: family1 del file count", regionNum,
 +        countFiles(false, family1));
 +    assertEquals("Before compaction: family2 del file count", regionNum,
 +        countFiles(false, family2));
 +
 +    // do the mob file compaction
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After first compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("After first compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("After first compaction: family1 mob file count", regionNum,
 +        countFiles(true, family1));
 +    assertEquals("After first compaction: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +    assertEquals("After first compaction: family1 del file count", 0, countFiles(false, family1));
 +    assertEquals("After first compaction: family2 del file count", regionNum,
 +        countFiles(false, family2));
 +    assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1));
 +    assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2));
 +
 +    admin.disableTable(tableName);
 +    // Restore from snapshot, the hfilelink will exist in mob dir
 +    admin.restoreSnapshot(snapshotName1);
 +    admin.enableTable(tableName);
 +
 +    assertEquals("After restoring snapshot: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("After restoring snapshot: mob cells count",
 +        regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
 +    assertEquals("After restoring snapshot: family1 mob file count", regionNum*count,
 +        countFiles(true, family1));
 +    assertEquals("After restoring snapshot: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +    assertEquals("After restoring snapshot: family1 del file count", 0,
 +        countFiles(false, family1));
 +    assertEquals("After restoring snapshot: family2 del file count", 0,
 +        countFiles(false, family2));
 +    assertEquals("After restoring snapshot: family1 hfilelink count", regionNum*count,
 +        countHFileLinks(family1));
 +    assertEquals("After restoring snapshot: family2 hfilelink count", 0,
 +        countHFileLinks(family2));
 +
 +    compactor.compact();
 +
 +    assertEquals("After second compaction: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("After second compaction: mob cells count",
 +        regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
 +    assertEquals("After second compaction: family1 mob file count", regionNum,
 +        countFiles(true, family1));
 +    assertEquals("After second compaction: family2 mob file count", regionNum*count,
 +        countFiles(true, family2));
 +    assertEquals("After second compaction: family1 del file count", 0, countFiles(false, family1));
 +    assertEquals("After second compaction: family2 del file count", 0, countFiles(false, family2));
 +    assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1));
 +    assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2));
 +  }
 +
 +  /**
 +   * Gets the number of rows in the given table.
 +   * @param table to get the  scanner
 +   * @return the number of rows
 +   */
 +  private int countMobRows(final HTable table) throws IOException {
 +    Scan scan = new Scan();
 +    // Do not retrieve the mob data when scanning
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      count++;
 +    }
 +    results.close();
 +    return count;
 +  }
 +
 +  /**
 +   * Gets the number of cells in the given table.
 +   * @param table to get the  scanner
 +   * @return the number of cells
 +   */
 +  private int countMobCells(final HTable table) throws IOException {
 +    Scan scan = new Scan();
 +    // Do not retrieve the mob data when scanning
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      for (Cell cell : res.listCells()) {
 +        count++;
 +      }
 +    }
 +    results.close();
 +    return count;
 +  }
 +
 +  /**
 +   * Gets the number of files in the mob path.
 +   * @param isMobFile gets number of the mob files or del files
 +   * @param familyName the family name
 +   * @return the number of the files
 +   */
 +  private int countFiles(boolean isMobFile, String familyName) throws IOException {
 +    Path mobDirPath = MobUtils.getMobFamilyPath(
 +        MobUtils.getMobRegionPath(conf, tableName), familyName);
 +    int count = 0;
 +    if (fs.exists(mobDirPath)) {
 +      FileStatus[] files = fs.listStatus(mobDirPath);
 +      for (FileStatus file : files) {
 +        if (isMobFile == true) {
 +          if (!StoreFileInfo.isDelFile(file.getPath())) {
 +            count++;
 +          }
 +        } else {
 +          if (StoreFileInfo.isDelFile(file.getPath())) {
 +            count++;
 +          }
 +        }
 +      }
 +    }
 +    return count;
 +  }
 +
 +  /**
 +   * Gets the number of HFileLink in the mob path.
 +   * @param familyName the family name
 +   * @return the number of the HFileLink
 +   */
 +  private int countHFileLinks(String familyName) throws IOException {
 +    Path mobDirPath = MobUtils.getMobFamilyPath(
 +        MobUtils.getMobRegionPath(conf, tableName), familyName);
 +    int count = 0;
 +    if (fs.exists(mobDirPath)) {
 +      FileStatus[] files = fs.listStatus(mobDirPath);
 +      for (FileStatus file : files) {
 +        if (HFileLink.isHFileLink(file.getPath())) {
 +          count++;
 +        }
 +      }
 +    }
 +    return count;
 +  }
 +
 +  /**
 +   * Gets the number of files.
 +   * @param size the size of the file
 +   * @param familyName the family name
 +   * @return the number of files large than the size
 +   */
 +  private int countLargeFiles(int size, String familyName) throws IOException {
 +    Path mobDirPath = MobUtils.getMobFamilyPath(
 +        MobUtils.getMobRegionPath(conf, tableName), familyName);
 +    int count = 0;
 +    if (fs.exists(mobDirPath)) {
 +      FileStatus[] files = fs.listStatus(mobDirPath);
 +      for (FileStatus file : files) {
 +        // ignore the del files in the mob path
 +        if ((!StoreFileInfo.isDelFile(file.getPath()))
 +            && (file.getLen() > size)) {
 +          count++;
 +        }
 +      }
 +    }
 +    return count;
 +  }
 +
 +  /**
 +   * loads some data to the table.
 +   * @param count the mob file number
 +   */
 +  private void loadData(int fileNum, int rowNumPerFile) throws IOException,
 +      InterruptedException {
 +    if (fileNum <= 0) {
 +      throw new IllegalArgumentException();
 +    }
 +    for (byte k0 : KEYS) {
 +      byte[] k = new byte[] { k0 };
 +      for (int i = 0; i < fileNum * rowNumPerFile; i++) {
 +        byte[] key = Bytes.add(k, Bytes.toBytes(i));
 +        byte[] mobVal = makeDummyData(10 * (i + 1));
 +        Put put = new Put(key);
 +        put.setDurability(Durability.SKIP_WAL);
 +        put.add(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
 +        put.add(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
 +        put.add(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
 +        hTable.put(put);
 +        if ((i + 1) % rowNumPerFile == 0) {
 +          hTable.flushCommits();
 +          admin.flush(tableName);
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * delete the row, family and cell to create the del file
 +   */
 +  private void createDelFile() throws IOException, InterruptedException {
 +    for (byte k0 : KEYS) {
 +      byte[] k = new byte[] { k0 };
 +      // delete a family
 +      byte[] key1 = Bytes.add(k, Bytes.toBytes(0));
 +      Delete delete1 = new Delete(key1);
 +      delete1.deleteFamily(Bytes.toBytes(family1));
 +      hTable.delete(delete1);
 +      // delete one row
 +      byte[] key2 = Bytes.add(k, Bytes.toBytes(2));
 +      Delete delete2 = new Delete(key2);
 +      hTable.delete(delete2);
 +      // delete one cell
 +      byte[] key3 = Bytes.add(k, Bytes.toBytes(4));
 +      Delete delete3 = new Delete(key3);
 +      delete3.deleteColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1));
 +      hTable.delete(delete3);
 +      hTable.flushCommits();
 +      admin.flush(tableName);
 +      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(
 +          Bytes.toBytes(tableNameAsString));
 +      for (HRegion region : regions) {
 +        region.waitForFlushesAndCompactions();
 +        region.compactStores(true);
 +      }
 +    }
 +  }
 +  /**
 +   * Creates the dummy data with a specific size.
 +   * @param the size of data
 +   * @return the dummy data
 +   */
 +  private byte[] makeDummyData(int size) {
 +    byte[] dummyData = new byte[size];
 +    new Random().nextBytes(dummyData);
 +    return dummyData;
 +  }
 +
 +  /**
 +   * Gets the split keys
 +   */
 +  public static byte[][] getSplitKeys() {
 +    byte[][] splitKeys = new byte[KEYS.length - 1][];
 +    for (int i = 0; i < splitKeys.length; ++i) {
 +      splitKeys[i] = new byte[] { KEYS[i + 1] };
 +    }
 +    return splitKeys;
 +  }
 +
 +  private static ExecutorService createThreadPool(Configuration conf) {
 +    int maxThreads = 10;
 +    long keepAliveTime = 60;
 +    final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
 +    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
 +        keepAliveTime, TimeUnit.SECONDS, queue,
 +        Threads.newDaemonThreadFactory("MobFileCompactionChore"),
 +        new RejectedExecutionHandler() {
 +          @Override
 +          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 +            try {
 +              // waiting for a thread to pick up instead of throwing exceptions.
 +              queue.put(r);
 +            } catch (InterruptedException e) {
 +              throw new RejectedExecutionException(e);
 +            }
 +          }
 +        });
 +    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
 +    return pool;
 +  }
 +
 +  /**
 +   * Resets the configuration.
 +   */
 +  private void resetConf() {
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
index ac66d95,0000000..f9159aa
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
@@@ -1,60 -1,0 +1,60 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.filecompactions;
 +
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.SmallTests;
++import org.apache.hadoop.hbase.testclassification.SmallTests;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(SmallTests.class)
 +public class TestPartitionedMobFileCompactionRequest {
 +
 +  @Test
 +  public void testCompactedPartitionId() {
 +    String startKey1 = "startKey1";
 +    String startKey2 = "startKey2";
 +    String date1 = "date1";
 +    String date2 = "date2";
 +    CompactionPartitionId partitionId1 = new CompactionPartitionId(startKey1, date1);
 +    CompactionPartitionId partitionId2 = new CompactionPartitionId(startKey2, date2);
 +    CompactionPartitionId partitionId3 = new CompactionPartitionId(startKey1, date2);
 +
 +    Assert.assertTrue(partitionId1.equals(partitionId1));
 +    Assert.assertFalse(partitionId1.equals(partitionId2));
 +    Assert.assertFalse(partitionId1.equals(partitionId3));
 +    Assert.assertFalse(partitionId2.equals(partitionId3));
 +
 +    Assert.assertEquals(startKey1, partitionId1.getStartKey());
 +    Assert.assertEquals(date1, partitionId1.getDate());
 +  }
 +
 +  @Test
 +  public void testCompactedPartition() {
 +    CompactionPartitionId partitionId = new CompactionPartitionId("startKey1", "date1");
 +    CompactionPartition partition = new CompactionPartition(partitionId);
 +    FileStatus file = new FileStatus(1, false, 1, 1024, 1, new Path("/test"));
 +    partition.addFile(file);
 +    Assert.assertEquals(file, partition.listFiles().get(0));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
index 1d64c0c,0000000..12c88b2
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
@@@ -1,423 -1,0 +1,423 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.filecompactions;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.List;
 +import java.util.Random;
 +import java.util.UUID;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.RejectedExecutionException;
 +import java.util.concurrent.RejectedExecutionHandler;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValue.Type;
- import org.apache.hadoop.hbase.LargeTests;
++import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.io.hfile.HFileContext;
 +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobFileName;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
 +import org.apache.hadoop.hbase.regionserver.BloomType;
 +import org.apache.hadoop.hbase.regionserver.HStore;
 +import org.apache.hadoop.hbase.regionserver.ScanInfo;
 +import org.apache.hadoop.hbase.regionserver.ScanType;
 +import org.apache.hadoop.hbase.regionserver.StoreFile;
 +import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 +import org.apache.hadoop.hbase.regionserver.StoreScanner;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.util.Threads;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(LargeTests.class)
 +public class TestPartitionedMobFileCompactor {
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static String family = "family";
 +  private final static String qf = "qf";
 +  private HColumnDescriptor hcd = new HColumnDescriptor(family);
 +  private Configuration conf = TEST_UTIL.getConfiguration();
 +  private CacheConfig cacheConf = new CacheConfig(conf);
 +  private FileSystem fs;
 +  private List<FileStatus> mobFiles = new ArrayList<>();
 +  private List<FileStatus> delFiles = new ArrayList<>();
 +  private List<FileStatus> allFiles = new ArrayList<>();
 +  private Path basePath;
 +  private String mobSuffix;
 +  private String delSuffix;
 +  private static ExecutorService pool;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
 +    TEST_UTIL.startMiniCluster(1);
 +    pool = createThreadPool(TEST_UTIL.getConfiguration());
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    pool.shutdown();
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  private void init(String tableName) throws Exception {
 +    fs = FileSystem.get(conf);
 +    Path testDir = FSUtils.getRootDir(conf);
 +    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
 +    basePath = new Path(new Path(mobTestDir, tableName), family);
 +    mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
 +    delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
 +  }
 +
 +  @Test
 +  public void testCompactionSelectWithAllFiles() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactionSelectWithAllFiles";
 +    init(tableName);
 +    int count = 10;
 +    // create 10 mob files.
 +    createStoreFiles(basePath, family, qf, count, Type.Put);
 +    // create 10 del files
 +    createStoreFiles(basePath, family, qf, count, Type.Delete);
 +    listFiles();
 +    long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD;
 +    List<String> expectedStartKeys = new ArrayList<>();
 +    for(FileStatus file : mobFiles) {
 +      if(file.getLen() < mergeSize) {
 +        String fileName = file.getPath().getName();
 +        String startKey = fileName.substring(0, 32);
 +        expectedStartKeys.add(startKey);
 +      }
 +    }
 +    testSelectFiles(tableName, CompactionType.ALL_FILES, expectedStartKeys);
 +  }
 +
 +  @Test
 +  public void testCompactionSelectWithPartFiles() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactionSelectWithPartFiles";
 +    init(tableName);
 +    int count = 10;
 +    // create 10 mob files.
 +    createStoreFiles(basePath, family, qf, count, Type.Put);
 +    // create 10 del files
 +    createStoreFiles(basePath, family, qf, count, Type.Delete);
 +    listFiles();
 +    long mergeSize = 4000;
 +    List<String> expectedStartKeys = new ArrayList<>();
 +    for(FileStatus file : mobFiles) {
 +      if(file.getLen() < 4000) {
 +        String fileName = file.getPath().getName();
 +        String startKey = fileName.substring(0, 32);
 +        expectedStartKeys.add(startKey);
 +      }
 +    }
 +    // set the mob file compaction mergeable threshold
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
 +    testSelectFiles(tableName, CompactionType.PART_FILES, expectedStartKeys);
 +  }
 +
 +  @Test
 +  public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactDelFilesWithDefaultBatchSize";
 +    init(tableName);
 +    // create 20 mob files.
 +    createStoreFiles(basePath, family, qf, 20, Type.Put);
 +    // create 13 del files
 +    createStoreFiles(basePath, family, qf, 13, Type.Delete);
 +    listFiles();
 +    testCompactDelFiles(tableName, 1, 13);
 +  }
 +
 +  @Test
 +  public void testCompactDelFilesWithSmallBatchSize() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactDelFilesWithSmallBatchSize";
 +    init(tableName);
 +    // create 20 mob files.
 +    createStoreFiles(basePath, family, qf, 20, Type.Put);
 +    // create 13 del files
 +    createStoreFiles(basePath, family, qf, 13, Type.Delete);
 +    listFiles();
 +
 +    // set the mob file compaction batch size
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4);
 +    testCompactDelFiles(tableName, 1, 13);
 +  }
 +
 +  @Test
 +  public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactDelFilesWithSmallBatchSize";
 +    init(tableName);
 +    // create 20 mob files.
 +    createStoreFiles(basePath, family, qf, 20, Type.Put);
 +    // create 13 del files
 +    createStoreFiles(basePath, family, qf, 13, Type.Delete);
 +    listFiles();
 +
 +    // set the max del file count
 +    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5);
 +    // set the mob file compaction batch size
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2);
 +    testCompactDelFiles(tableName, 4, 13);
 +  }
 +
 +  /**
 +   * Tests the selectFiles
 +   * @param tableName the table name
 +   * @param type the expected compaction type
 +   * @param expected the expected start keys
 +   */
 +  private void testSelectFiles(String tableName, final CompactionType type,
 +      final List<String> expected) throws IOException {
 +    PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
 +      TableName.valueOf(tableName), hcd, pool) {
 +      @Override
 +      public List<Path> compact(List<FileStatus> files) throws IOException {
 +        if (files == null || files.isEmpty()) {
 +          return null;
 +        }
 +        PartitionedMobFileCompactionRequest request = select(files);
 +        // assert the compaction type is ALL_FILES
 +        Assert.assertEquals(type, request.type);
 +        // assert get the right partitions
 +        compareCompactedPartitions(expected, request.compactionPartitions);
 +        // assert get the right del files
 +        compareDelFiles(request.delFiles);
 +        return null;
 +      }
 +    };
 +    compactor.compact(allFiles);
 +  }
 +
 +  /**
 +   * Tests the compacteDelFile
 +   * @param tableName the table name
 +   * @param expectedFileCount the expected file count
 +   * @param expectedCellCount the expected cell count
 +   */
 +  private void testCompactDelFiles(String tableName, final int expectedFileCount,
 +      final int expectedCellCount) throws IOException {
 +    PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
 +      TableName.valueOf(tableName), hcd, pool) {
 +      @Override
 +      protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
 +          throws IOException {
 +        List<Path> delFilePaths = new ArrayList<Path>();
 +        for (FileStatus delFile : request.delFiles) {
 +          delFilePaths.add(delFile.getPath());
 +        }
 +        List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
 +        // assert the del files are merged.
 +        Assert.assertEquals(expectedFileCount, newDelPaths.size());
 +        Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
 +        return null;
 +      }
 +    };
 +
 +    compactor.compact(allFiles);
 +  }
 +
 +  /**
 +   * Lists the files in the path
 +   */
 +  private void listFiles() throws IOException {
 +    for (FileStatus file : fs.listStatus(basePath)) {
 +      allFiles.add(file);
 +      if (file.getPath().getName().endsWith("_del")) {
 +        delFiles.add(file);
 +      } else {
 +        mobFiles.add(file);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Compares the compacted partitions.
 +   * @param partitions the collection of CompactedPartitions
 +   */
 +  private void compareCompactedPartitions(List<String> expected,
 +      Collection<CompactionPartition> partitions) {
 +    List<String> actualKeys = new ArrayList<>();
 +    for (CompactionPartition partition : partitions) {
 +      actualKeys.add(partition.getPartitionId().getStartKey());
 +    }
 +    Collections.sort(expected);
 +    Collections.sort(actualKeys);
 +    Assert.assertEquals(expected.size(), actualKeys.size());
 +    for (int i = 0; i < expected.size(); i++) {
 +      Assert.assertEquals(expected.get(i), actualKeys.get(i));
 +    }
 +  }
 +
 +  /**
 +   * Compares the del files.
 +   * @param allDelFiles all the del files
 +   */
 +  private void compareDelFiles(Collection<FileStatus> allDelFiles) {
 +    int i = 0;
 +    for (FileStatus file : allDelFiles) {
 +      Assert.assertEquals(delFiles.get(i), file);
 +      i++;
 +    }
 +  }
 +
 +  /**
 +   * Creates store files.
 +   * @param basePath the path to create file
 +   * @family the family name
 +   * @qualifier the column qualifier
 +   * @count the store file number
 +   * @type the key type
 +   */
 +  private void createStoreFiles(Path basePath, String family, String qualifier, int count,
 +      Type type) throws IOException {
 +    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
 +    String startKey = "row_";
 +    MobFileName mobFileName = null;
 +    for (int i = 0; i < count; i++) {
 +      byte[] startRow = Bytes.toBytes(startKey + i) ;
 +      if(type.equals(Type.Delete)) {
 +        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
 +            new Date()), delSuffix);
 +      }
 +      if(type.equals(Type.Put)){
 +        mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
 +            new Date()), mobSuffix);
 +      }
 +      StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs)
 +      .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
 +      writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
 +          type, (i+1)*1000);
 +    }
 +  }
 +
 +  /**
 +   * Writes data to store file.
 +   * @param writer the store file writer
 +   * @param row the row key
 +   * @param family the family name
 +   * @param qualifier the column qualifier
 +   * @param type the key type
 +   * @param size the size of value
 +   */
 +  private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family,
 +      byte[] qualifier, Type type, int size) throws IOException {
 +    long now = System.currentTimeMillis();
 +    try {
 +      byte[] dummyData = new byte[size];
 +      new Random().nextBytes(dummyData);
 +      writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
 +    } finally {
 +      writer.close();
 +    }
 +  }
 +
 +  /**
 +   * Gets the number of del cell in the del files
 +   * @param paths the del file paths
 +   * @return the cell size
 +   */
 +  private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
 +    List<StoreFile> sfs = new ArrayList<StoreFile>();
 +    int size = 0;
 +    for(Path path : paths) {
 +      StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
 +      sfs.add(sf);
 +    }
 +    List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
 +        false, null, HConstants.LATEST_TIMESTAMP);
 +    Scan scan = new Scan();
 +    scan.setMaxVersions(hcd.getMaxVersions());
 +    long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
 +    long ttl = HStore.determineTTLFromFamily(hcd);
 +    ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
 +    StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null,
 +        scanners, 0L, HConstants.LATEST_TIMESTAMP);
 +    List<Cell> results = new ArrayList<>();
 +    boolean hasMore = true;
 +    while (hasMore) {
 +      hasMore = scanner.next(results);
 +      size += results.size();
 +      results.clear();
 +    }
 +    scanner.close();
 +    return size;
 +  }
 +
 +  private static ExecutorService createThreadPool(Configuration conf) {
 +    int maxThreads = 10;
 +    long keepAliveTime = 60;
 +    final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
 +    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
 +      TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
 +      new RejectedExecutionHandler() {
 +        @Override
 +        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 +          try {
 +            // waiting for a thread to pick up instead of throwing exceptions.
 +            queue.put(r);
 +          } catch (InterruptedException e) {
 +            throw new RejectedExecutionException(e);
 +          }
 +        }
 +      });
 +    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
 +    return pool;
 +  }
 +
 +  /**
 +   * Resets the configuration.
 +   */
 +  private void resetConf() {
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
 +    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
index e0b9a83,0000000..49345e4
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
@@@ -1,168 -1,0 +1,168 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.mapreduce;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.io.IOException;
 +import java.util.List;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.CommonConfigurationKeys;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
- import org.apache.hadoop.hbase.MediumTests;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
++import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.io.IOUtils;
 +import org.apache.hadoop.io.SequenceFile;
 +import org.apache.hadoop.io.serializer.JavaSerialization;
 +import org.apache.hadoop.io.serializer.WritableSerialization;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(MediumTests.class)
 +public class TestMobSweepJob {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
 +        JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
 +    TEST_UTIL.startMiniCluster();
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  private void writeFileNames(FileSystem fs, Configuration conf, Path path,
 +      String[] filesNames) throws IOException {
 +    // write the names to a sequence file
 +    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
 +        String.class, String.class);
 +    try {
 +      for (String fileName : filesNames) {
 +        writer.append(fileName, MobConstants.EMPTY_STRING);
 +      }
 +    } finally {
 +      IOUtils.closeStream(writer);
 +    }
 +  }
 +
 +  @Test
 +  public void testSweeperJobWithOutUnusedFile() throws Exception {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Configuration configuration = new Configuration(
 +        TEST_UTIL.getConfiguration());
 +    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/0/visited");
 +    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/0/all");
 +    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
 +        vistiedFileNamesPath.toString());
 +    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
 +        allFileNamesPath.toString());
 +
 +    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
 +        "2", "3", "4", "5", "6"});
 +
 +    Path r0 = new Path(vistiedFileNamesPath, "r0");
 +    writeFileNames(fs, configuration, r0, new String[] { "1",
 +        "2", "3"});
 +    Path r1 = new Path(vistiedFileNamesPath, "r1");
 +    writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"});
 +    Path r2 = new Path(vistiedFileNamesPath, "r2");
 +    writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"});
 +
 +    SweepJob sweepJob = new SweepJob(configuration, fs);
 +    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
 +
 +    assertEquals(0, toBeArchived.size());
 +  }
 +
 +  @Test
 +  public void testSweeperJobWithUnusedFile() throws Exception {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Configuration configuration = new Configuration(
 +        TEST_UTIL.getConfiguration());
 +    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/1/visited");
 +    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/1/all");
 +    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
 +        vistiedFileNamesPath.toString());
 +    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
 +        allFileNamesPath.toString());
 +
 +    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
 +        "2", "3", "4", "5", "6"});
 +
 +    Path r0 = new Path(vistiedFileNamesPath, "r0");
 +    writeFileNames(fs, configuration, r0, new String[] { "1",
 +        "2", "3"});
 +    Path r1 = new Path(vistiedFileNamesPath, "r1");
 +    writeFileNames(fs, configuration, r1, new String[] { "1", "5"});
 +    Path r2 = new Path(vistiedFileNamesPath, "r2");
 +    writeFileNames(fs, configuration, r2, new String[] { "2", "3"});
 +
 +    SweepJob sweepJob = new SweepJob(configuration, fs);
 +    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
 +
 +    assertEquals(2, toBeArchived.size());
 +    assertEquals(new String[] { "4", "6" }, toBeArchived.toArray(new String[0]));
 +  }
 +
 +  @Test
 +  public void testSweeperJobWithRedundantFile() throws Exception {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Configuration configuration = new Configuration(
 +        TEST_UTIL.getConfiguration());
 +    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/2/visited");
 +    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/2/all");
 +    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
 +        vistiedFileNamesPath.toString());
 +    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
 +        allFileNamesPath.toString());
 +
 +    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
 +        "2", "3", "4", "5", "6"});
 +
 +    Path r0 = new Path(vistiedFileNamesPath, "r0");
 +    writeFileNames(fs, configuration, r0, new String[] { "1",
 +        "2", "3"});
 +    Path r1 = new Path(vistiedFileNamesPath, "r1");
 +    writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"});
 +    Path r2 = new Path(vistiedFileNamesPath, "r2");
 +    writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"});
 +
 +    SweepJob sweepJob = new SweepJob(configuration, fs);
 +    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
 +
 +    assertEquals(0, toBeArchived.size());
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
index 2aa3a4a,0000000..9e95a39
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
@@@ -1,120 -1,0 +1,120 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.mapreduce;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.mockito.Matchers.any;
 +import static org.mockito.Mockito.doAnswer;
 +import static org.mockito.Mockito.mock;
 +import static org.mockito.Mockito.when;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.ServerName;
- import org.apache.hadoop.hbase.SmallTests;
 +import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.testclassification.SmallTests;
 +import org.apache.hadoop.hbase.client.Result;
 +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 +import org.apache.hadoop.hbase.master.TableLockManager;
 +import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +import org.mockito.invocation.InvocationOnMock;
 +import org.mockito.stubbing.Answer;
 +
 +@Category(SmallTests.class)
 +public class TestMobSweepMapper {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
 +    TEST_UTIL.startMiniCluster(1);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  @Test
 +  public void TestMap() throws Exception {
 +    String prefix = "0000";
 +    final String fileName = "19691231f2cd014ea28f42788214560a21a44cef";
 +    final String mobFilePath = prefix + fileName;
 +
 +    ImmutableBytesWritable r = new ImmutableBytesWritable(Bytes.toBytes("r"));
 +    final KeyValue[] kvList = new KeyValue[1];
 +    kvList[0] = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
 +            Bytes.toBytes("column"), Bytes.toBytes(mobFilePath));
 +
 +    Result columns = mock(Result.class);
-     when(columns.raw()).thenReturn(kvList);
++    when(columns.rawCells()).thenReturn(kvList);
 +
 +    Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
 +    ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
 +    TableName tn = TableName.valueOf("testSweepMapper");
 +    TableName lockName = MobUtils.getTableLockName(tn);
 +    String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
 +    configuration.set(SweepJob.SWEEP_JOB_ID, "1");
 +    configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
 +    ServerName serverName = SweepJob.getCurrentServerName(configuration);
 +    configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
 +
 +    TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
 +        serverName);
 +    TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
 +    lock.acquire();
 +    try {
 +      Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
 +        mock(Mapper.Context.class);
 +      when(ctx.getConfiguration()).thenReturn(configuration);
 +      SweepMapper map = new SweepMapper();
 +      doAnswer(new Answer<Void>() {
 +
 +        @Override
 +        public Void answer(InvocationOnMock invocation) throws Throwable {
 +          Text text = (Text) invocation.getArguments()[0];
 +          KeyValue kv = (KeyValue) invocation.getArguments()[1];
 +
 +          assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName);
 +          assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
 +
 +          return null;
 +        }
 +      }).when(ctx).write(any(Text.class), any(KeyValue.class));
 +
 +      map.map(r, columns, ctx);
 +    } finally {
 +      lock.release();
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
index 1a69d06,0000000..308b50e
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
@@@ -1,220 -1,0 +1,220 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.mapreduce;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.mockito.Mockito.mock;
 +import static org.mockito.Mockito.when;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.TreeSet;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.CommonConfigurationKeys;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.MediumTests;
 +import org.apache.hadoop.hbase.ServerName;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.client.Admin;
 +import org.apache.hadoop.hbase.client.HTable;
 +import org.apache.hadoop.hbase.client.Put;
 +import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 +import org.apache.hadoop.hbase.master.TableLockManager;
 +import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
 +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
++import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 +import org.apache.hadoop.io.SequenceFile;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.io.serializer.JavaSerialization;
 +import org.apache.hadoop.mapreduce.Counter;
 +import org.apache.hadoop.mapreduce.Reducer;
 +import org.apache.hadoop.mapreduce.counters.GenericCounter;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +import org.mockito.Matchers;
 +
 +@Category(MediumTests.class)
 +public class TestMobSweepReducer {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static String tableName = "testSweepReducer";
 +  private final static String row = "row";
 +  private final static String family = "family";
 +  private final static String qf = "qf";
 +  private static HTable table;
 +  private static Admin admin;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +
 +    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
 +
 +    TEST_UTIL.startMiniCluster(1);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  @SuppressWarnings("deprecation")
 +  @Before
 +  public void setUp() throws Exception {
 +    HTableDescriptor desc = new HTableDescriptor(tableName);
 +    HColumnDescriptor hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    hcd.setMaxVersions(4);
 +    desc.addFamily(hcd);
 +
 +    admin = TEST_UTIL.getHBaseAdmin();
 +    admin.createTable(desc);
 +    table = new HTable(TEST_UTIL.getConfiguration(), tableName);
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    admin.disableTable(TableName.valueOf(tableName));
 +    admin.deleteTable(TableName.valueOf(tableName));
 +    admin.close();
 +  }
 +
 +  private List<String> getKeyFromSequenceFile(FileSystem fs, Path path,
 +                                              Configuration conf) throws Exception {
 +    List<String> list = new ArrayList<String>();
 +    SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
 +
 +    String next = (String) reader.next((String) null);
 +    while (next != null) {
 +      list.add(next);
 +      next = (String) reader.next((String) null);
 +    }
 +    reader.close();
 +    return list;
 +  }
 +
 +  @Test
 +  public void testRun() throws Exception {
 +
 +    TableName tn = TableName.valueOf(tableName);
 +    byte[] mobValueBytes = new byte[100];
 +
 +    //get the path where mob files lie in
 +    Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family);
 +
 +    Put put = new Put(Bytes.toBytes(row));
 +    put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
 +    Put put2 = new Put(Bytes.toBytes(row + "ignore"));
 +    put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
 +    table.put(put);
 +    table.put(put2);
 +    table.flushCommits();
 +    admin.flush(tn);
 +
 +    FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    //check the generation of a mob file
 +    assertEquals(1, fileStatuses.length);
 +
 +    String mobFile1 = fileStatuses[0].getPath().getName();
 +
 +    Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
 +    configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f);
 +    configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName);
 +    configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family);
 +    configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir");
 +    configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir");
 +    configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
 +            JavaSerialization.class.getName());
 +    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir");
 +    configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
 +        System.currentTimeMillis() + 24 * 3600 * 1000);
 +
 +    ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
 +    TableName lockName = MobUtils.getTableLockName(tn);
 +    String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
 +    configuration.set(SweepJob.SWEEP_JOB_ID, "1");
 +    configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
 +    ServerName serverName = SweepJob.getCurrentServerName(configuration);
 +    configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
 +
 +    TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
 +        serverName);
 +    TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
 +    lock.acquire();
 +    try {
 +      // use the same counter when mocking
 +      Counter counter = new GenericCounter();
 +      Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class);
 +      when(ctx.getConfiguration()).thenReturn(configuration);
 +      when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
 +      when(ctx.nextKey()).thenReturn(true).thenReturn(false);
 +      when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
 +
 +      byte[] refBytes = Bytes.toBytes(mobFile1);
 +      long valueLength = refBytes.length;
 +      byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
 +      KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1,
 +        KeyValue.Type.Put, newValue);
 +      List<KeyValue> list = new ArrayList<KeyValue>();
 +      list.add(kv2);
 +
 +      when(ctx.getValues()).thenReturn(list);
 +
 +      SweepReducer reducer = new SweepReducer();
 +      reducer.run(ctx);
 +    } finally {
 +      lock.release();
 +    }
 +    FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    String mobFile2 = filsStatuses2[0].getPath().getName();
 +    //new mob file is generated, old one has been archived
 +    assertEquals(1, filsStatuses2.length);
 +    assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
 +
 +    //test sequence file
 +    String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY);
 +    FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
 +    Set<String> files = new TreeSet<String>();
 +    for (FileStatus st : statuses) {
 +      files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(),
 +              st.getPath(), configuration));
 +    }
 +    assertEquals(1, files.size());
 +    assertEquals(true, files.contains(mobFile1));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
index c4817aa,0000000..1689c2a
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
@@@ -1,307 -1,0 +1,307 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.mapreduce;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.io.IOException;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.TreeSet;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.MediumTests;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.client.Admin;
 +import org.apache.hadoop.hbase.client.HTable;
 +import org.apache.hadoop.hbase.client.Put;
 +import org.apache.hadoop.hbase.client.Result;
 +import org.apache.hadoop.hbase.client.ResultScanner;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
++import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.util.ToolRunner;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(MediumTests.class)
 +public class TestMobSweeper {
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private String tableName;
 +  private final static String row = "row_";
 +  private final static String family = "family";
 +  private final static String column = "column";
 +  private static HTable table;
 +  private static Admin admin;
 +
 +  private Random random = new Random();
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 15); // avoid major compactions
 +    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.max", 30); // avoid major compactions
 +    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
 +
 +    TEST_UTIL.startMiniCluster();
 +
 +    TEST_UTIL.startMiniMapReduceCluster();
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +    TEST_UTIL.shutdownMiniMapReduceCluster();
 +  }
 +
 +  @SuppressWarnings("deprecation")
 +  @Before
 +  public void setUp() throws Exception {
 +    long tid = System.currentTimeMillis();
 +    tableName = "testSweeper" + tid;
 +    HTableDescriptor desc = new HTableDescriptor(tableName);
 +    HColumnDescriptor hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    hcd.setMaxVersions(4);
 +    desc.addFamily(hcd);
 +
 +    admin = TEST_UTIL.getHBaseAdmin();
 +    admin.createTable(desc);
 +    table = new HTable(TEST_UTIL.getConfiguration(), tableName);
 +    table.setAutoFlush(false);
 +
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    admin.disableTable(TableName.valueOf(tableName));
 +    admin.deleteTable(TableName.valueOf(tableName));
 +    admin.close();
 +  }
 +
 +  private Path getMobFamilyPath(Configuration conf, String tableNameStr,
 +                                String familyName) {
 +    Path p = new Path(MobUtils.getMobRegionPath(conf, TableName.valueOf(tableNameStr)),
 +            familyName);
 +    return p;
 +  }
 +
 +
 +  private String mergeString(Set<String> set) {
 +    StringBuilder sb = new StringBuilder();
 +    for (String s : set)
 +      sb.append(s);
 +    return sb.toString();
 +  }
 +
 +
 +  private void generateMobTable(int count, int flushStep)
 +          throws IOException, InterruptedException {
 +    if (count <= 0 || flushStep <= 0)
 +      return;
 +    int index = 0;
 +    for (int i = 0; i < count; i++) {
 +      byte[] mobVal = new byte[101*1024];
 +      random.nextBytes(mobVal);
 +
 +      Put put = new Put(Bytes.toBytes(row + i));
 +      put.add(Bytes.toBytes(family), Bytes.toBytes(column), mobVal);
 +      table.put(put);
 +      if (index++ % flushStep == 0) {
 +        table.flushCommits();
 +        admin.flush(TableName.valueOf(tableName));
 +      }
 +
 +
 +    }
 +    table.flushCommits();
 +    admin.flush(TableName.valueOf(tableName));
 +  }
 +
 +  @Test
 +  public void testSweeper() throws Exception {
 +
 +    int count = 10;
 +    //create table and generate 10 mob files
 +    generateMobTable(count, 1);
 +
 +    //get mob files
 +    Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
 +    FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    // mobFileSet0 stores the orignal mob files
 +    TreeSet<String> mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +
 +    //scan the table, retreive the references
 +    Scan scan = new Scan();
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner rs = table.getScanner(scan);
 +    TreeSet<String> mobFilesScanned = new TreeSet<String>();
 +    for (Result res : rs) {
 +      byte[] valueBytes = res.getValue(Bytes.toBytes(family),
 +          Bytes.toBytes(column));
 +      mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
 +          valueBytes.length - Bytes.SIZEOF_INT));
 +    }
 +
 +    //there should be 10 mob files
 +    assertEquals(10, mobFilesScanned.size());
 +    //check if we store the correct reference of mob files
 +    assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
 +
 +
 +    Configuration conf = TEST_UTIL.getConfiguration();
 +    conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 24 * 60 * 60 * 1000);
 +
 +    String[] args = new String[2];
 +    args[0] = tableName;
 +    args[1] = family;
 +    ToolRunner.run(conf, new Sweeper(), args);
 +
 +
 +    mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
 +    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +
 +    assertEquals(10, mobFilesSet.size());
 +
 +
 +    scan = new Scan();
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
 +    rs = table.getScanner(scan);
 +    TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
 +    for (Result res : rs) {
 +      byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
 +          column));
 +      mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
 +          valueBytes.length - Bytes.SIZEOF_INT));
 +    }
 +
 +    assertEquals(10, mobFilesScannedAfterJob.size());
 +
 +    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +
 +    assertEquals(10, mobFilesSet.size());
 +    assertEquals(true, mobFilesScannedAfterJob.iterator().next()
 +            .equalsIgnoreCase(mobFilesSet.iterator().next()));
 +
 +  }
 +
 +  @Test
 +  public void testCompactionDelaySweeper() throws Exception {
 +
 +    int count = 10;
 +    //create table and generate 10 mob files
 +    generateMobTable(count, 1);
 +
 +    //get mob files
 +    Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
 +    FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    // mobFileSet0 stores the orignal mob files
 +    TreeSet<String> mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +
 +    //scan the table, retreive the references
 +    Scan scan = new Scan();
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner rs = table.getScanner(scan);
 +    TreeSet<String> mobFilesScanned = new TreeSet<String>();
 +    for (Result res : rs) {
 +      byte[] valueBytes = res.getValue(Bytes.toBytes(family),
 +              Bytes.toBytes(column));
 +      mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
 +          valueBytes.length - Bytes.SIZEOF_INT));
 +    }
 +
 +    //there should be 10 mob files
 +    assertEquals(10, mobFilesScanned.size());
 +    //check if we store the correct reference of mob files
 +    assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
 +
 +
 +    Configuration conf = TEST_UTIL.getConfiguration();
 +    conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 0);
 +
 +    String[] args = new String[2];
 +    args[0] = tableName;
 +    args[1] = family;
 +    ToolRunner.run(conf, new Sweeper(), args);
 +
 +
 +    mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
 +    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +
 +    assertEquals(1, mobFilesSet.size());
 +
 +
 +    scan = new Scan();
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
 +    rs = table.getScanner(scan);
 +    TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
 +    for (Result res : rs) {
 +      byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
 +              column));
 +      mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
 +          valueBytes.length - Bytes.SIZEOF_INT));
 +    }
 +
 +    assertEquals(1, mobFilesScannedAfterJob.size());
 +
 +    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +
 +    assertEquals(1, mobFilesSet.size());
 +    assertEquals(true, mobFilesScannedAfterJob.iterator().next()
 +            .equalsIgnoreCase(mobFilesSet.iterator().next()));
 +
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
index c7d146b,852d319..0d28e54
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
@@@ -242,73 -242,22 +242,92 @@@ public class MetricsRegionServerWrapper
    }
  
    @Override
+   public long getHedgedReadOps() {
+     return 100;
+   }
+ 
+   @Override
+   public long getHedgedReadWins() {
+     return 10;
+   }
+ 
+   @Override
+   public long getBlockedRequestsCount() {
+     return 0;
+   }
+ 
+   @Override
+   public int getSplitQueueSize() {
+     return 0;
+   }
++
++  @Override
 +  public long getMobCompactedIntoMobCellsCount() {
 +    return 20;
 +  }
 +
 +  @Override
 +  public long getMobCompactedFromMobCellsCount() {
 +    return 10;
 +  }
 +
 +  @Override
 +  public long getMobCompactedIntoMobCellsSize() {
 +    return 200;
 +  }
 +
 +  @Override
 +  public long getMobCompactedFromMobCellsSize() {
 +    return 100;
 +  }
 +
 +  @Override
 +  public long getMobFlushCount() {
 +    return 1;
 +  }
 +
 +  @Override
 +  public long getMobFlushedCellsCount() {
 +    return 10;
 +  }
 +
 +  @Override
 +  public long getMobFlushedCellsSize() {
 +    return 1000;
 +  }
 +
 +  @Override
 +  public long getMobScanCellsCount() {
 +    return 10;
 +  }
 +
 +  @Override
 +  public long getMobScanCellsSize() {
 +    return 1000;
 +  }
 +
 +  @Override
 +  public long getMobFileCacheAccessCount() {
 +    return 100;
 +  }
 +
 +  @Override
 +  public long getMobFileCacheMissCount() {
 +    return 50;
 +  }
 +
 +  @Override
 +  public long getMobFileCacheEvictedCount() {
 +    return 0;
 +  }
 +
 +  @Override
 +  public long getMobFileCacheCount() {
 +    return 100;
 +  }
 +
 +  @Override
 +  public int getMobFileCacheHitPercent() {
 +    return 50;
 +  }
- 
- }
+ }