You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/05/01 17:28:32 UTC

[46/50] [abbrv] hbase git commit: Merge branch 'apache/master' (4/16/15) into hbase-11339

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
index e4cad6f,0000000..ba0b620
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
@@@ -1,827 -1,0 +1,822 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.filecompactions;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Random;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.RejectedExecutionException;
 +import java.util.concurrent.RejectedExecutionHandler;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.NamespaceDescriptor;
++import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Admin;
- import org.apache.hadoop.hbase.client.Delete;
- import org.apache.hadoop.hbase.client.Durability;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.ResultScanner;
- import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.HFileLink;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 +import org.apache.hadoop.hbase.regionserver.HRegion;
 +import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 +import org.apache.hadoop.hbase.util.Threads;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(LargeTests.class)
 +public class TestMobFileCompactor {
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private Configuration conf = null;
 +  private String tableNameAsString;
 +  private TableName tableName;
-   private HTable hTable;
++  private static Connection conn;
++  private BufferedMutator bufMut;
++  private Table hTable;
 +  private Admin admin;
 +  private HTableDescriptor desc;
 +  private HColumnDescriptor hcd1;
 +  private HColumnDescriptor hcd2;
 +  private FileSystem fs;
-   private final String family1 = "family1";
-   private final String family2 = "family2";
-   private final String qf1 = "qualifier1";
-   private final String qf2 = "qualifier2";
-   private byte[] KEYS = Bytes.toBytes("012");
-   private int regionNum = KEYS.length;
-   private int delRowNum = 1;
-   private int delCellNum = 6;
-   private int cellNumPerRow = 3;
-   private int rowNumPerFile = 2;
++  private static final String family1 = "family1";
++  private static final String family2 = "family2";
++  private static final String qf1 = "qualifier1";
++  private static final String qf2 = "qualifier2";
++  private static byte[] KEYS = Bytes.toBytes("012");
++  private static int regionNum = KEYS.length;
++  private static int delRowNum = 1;
++  private static int delCellNum = 6;
++  private static int cellNumPerRow = 3;
++  private static int rowNumPerFile = 2;
 +  private static ExecutorService pool;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.getConfiguration().setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, 5000);
 +    TEST_UTIL.startMiniCluster(1);
 +    pool = createThreadPool(TEST_UTIL.getConfiguration());
++    conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    pool.shutdown();
++    conn.close();
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  @Before
 +  public void setUp() throws Exception {
 +    fs = TEST_UTIL.getTestFileSystem();
 +    conf = TEST_UTIL.getConfiguration();
 +    long tid = System.currentTimeMillis();
 +    tableNameAsString = "testMob" + tid;
 +    tableName = TableName.valueOf(tableNameAsString);
 +    hcd1 = new HColumnDescriptor(family1);
 +    hcd1.setMobEnabled(true);
 +    hcd1.setMobThreshold(0L);
 +    hcd1.setMaxVersions(4);
 +    hcd2 = new HColumnDescriptor(family2);
 +    hcd2.setMobEnabled(true);
 +    hcd2.setMobThreshold(0L);
 +    hcd2.setMaxVersions(4);
 +    desc = new HTableDescriptor(tableName);
 +    desc.addFamily(hcd1);
 +    desc.addFamily(hcd2);
 +    admin = TEST_UTIL.getHBaseAdmin();
 +    admin.createTable(desc, getSplitKeys());
-     hTable = new HTable(conf, tableNameAsString);
-     hTable.setAutoFlush(false, false);
++    hTable = conn.getTable(tableName);
++    bufMut = conn.getBufferedMutator(tableName);
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    admin.disableTable(tableName);
 +    admin.deleteTable(tableName);
 +    admin.close();
 +    hTable.close();
 +    fs.delete(TEST_UTIL.getDataTestDir(), true);
 +  }
 +
 +  @Test
 +  public void testCompactionWithoutDelFilesWithNamespace() throws Exception {
 +    resetConf();
 +    // create a table with namespace
 +    NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("ns").build();
 +    String tableNameAsString = "ns:testCompactionWithoutDelFilesWithNamespace";
 +    admin.createNamespace(namespaceDescriptor);
 +    TableName tableName = TableName.valueOf(tableNameAsString);
 +    HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
 +    hcd1.setMobEnabled(true);
 +    hcd1.setMobThreshold(0L);
 +    hcd1.setMaxVersions(4);
 +    HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
 +    hcd2.setMobEnabled(true);
 +    hcd2.setMobThreshold(0L);
 +    hcd2.setMaxVersions(4);
 +    HTableDescriptor desc = new HTableDescriptor(tableName);
 +    desc.addFamily(hcd1);
 +    desc.addFamily(hcd2);
 +    admin.createTable(desc, getSplitKeys());
-     HTable table = new HTable(conf, tableName);
-     table.setAutoFlush(false, false);
++    BufferedMutator bufMut= conn.getBufferedMutator(tableName);
++    Table table = conn.getTable(tableName);
 +
 +    int count = 4;
 +    // generate mob files
-     loadData(admin, table, tableName, count, rowNumPerFile);
++    loadData(admin, bufMut, tableName, count, rowNumPerFile);
 +    int rowNumPerRegion = count * rowNumPerFile;
 +
 +    assertEquals("Before compaction: mob rows count", regionNum * rowNumPerRegion,
 +      countMobRows(table));
 +    assertEquals("Before compaction: mob file count", regionNum * count,
 +      countFiles(tableName, true, family1));
 +    assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1));
 +
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After compaction: mob rows count", regionNum * rowNumPerRegion,
 +      countMobRows(table));
 +    assertEquals("After compaction: mob file count", regionNum,
 +      countFiles(tableName, true, family1));
 +    assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1));
 +
 +    table.close();
 +    admin.disableTable(tableName);
 +    admin.deleteTable(tableName);
 +    admin.deleteNamespace("ns");
 +  }
 +
 +  @Test
 +  public void testCompactionWithoutDelFiles() throws Exception {
 +    resetConf();
 +    int count = 4;
 +    // generate mob files
-     loadData(admin, hTable, tableName, count, rowNumPerFile);
++    loadData(admin, bufMut, tableName, count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob file count", regionNum * count,
 +      countFiles(tableName, true, family1));
 +    assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1));
 +
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("After compaction: mob file count", regionNum,
 +      countFiles(tableName, true, family1));
 +    assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1));
 +  }
 +
 +  @Test
 +  public void testCompactionWithDelFiles() throws Exception {
 +    resetConf();
 +    int count = 4;
 +    // generate mob files
-     loadData(admin, hTable, tableName, count, rowNumPerFile);
++    loadData(admin, bufMut, tableName, count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
 +        countMobCells(hTable));
 +    assertEquals("Before deleting: family1 mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +    assertEquals("Before deleting: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +
 +    createDelFile();
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("Before compaction: family1 mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +    assertEquals("Before compaction: family2 file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("Before compaction: family1 del file count", regionNum,
 +        countFiles(tableName, false, family1));
 +    assertEquals("Before compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +
 +    // do the mob file compaction
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("After compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("After compaction: family1 mob file count", regionNum,
 +        countFiles(tableName, true, family1));
 +    assertEquals("After compaction: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("After compaction: family1 del file count", 0,
 +      countFiles(tableName, false, family1));
 +    assertEquals("After compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +    assertRefFileNameEqual(family1);
 +  }
 +
 +  @Test
 +  public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception {
 +    resetConf();
 +    int mergeSize = 5000;
 +    // change the mob compaction merge size
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
 +
 +    int count = 4;
 +    // generate mob files
-     loadData(admin, hTable, tableName, count, rowNumPerFile);
++    loadData(admin, bufMut, tableName, count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
 +        countMobCells(hTable));
 +    assertEquals("Before deleting: mob file count", regionNum * count,
 +      countFiles(tableName, true, family1));
 +
 +    int largeFilesCount = countLargeFiles(mergeSize, family1);
 +    createDelFile();
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("Before compaction: family1 mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +    assertEquals("Before compaction: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("Before compaction: family1 del file count", regionNum,
 +        countFiles(tableName, false, family1));
 +    assertEquals("Before compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +
 +    // do the mob file compaction
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("After compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    // After the compaction, the files smaller than the mob compaction merge size
 +    // is merge to one file
 +    assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum,
 +        countFiles(tableName, true, family1));
 +    assertEquals("After compaction: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("After compaction: family1 del file count", regionNum,
 +        countFiles(tableName, false, family1));
 +    assertEquals("After compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +  }
 +
 +  @Test
 +  public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() throws Exception {
 +    resetConf();
 +    int batchSize = 2;
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, batchSize);
 +    int count = 4;
 +    // generate mob files
-     loadData(admin, hTable, tableName, count, rowNumPerFile);
++    loadData(admin, bufMut, tableName, count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    assertEquals("Before deleting: mob row count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("Before deleting: family1 mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +    assertEquals("Before deleting: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +
 +    createDelFile();
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("Before compaction: family1 mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +    assertEquals("Before compaction: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("Before compaction: family1 del file count", regionNum,
 +        countFiles(tableName, false, family1));
 +    assertEquals("Before compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +
 +    // do the mob file compaction
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("After compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("After compaction: family1 mob file count", regionNum*(count/batchSize),
 +        countFiles(tableName, true, family1));
 +    assertEquals("After compaction: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("After compaction: family1 del file count", 0,
 +      countFiles(tableName, false, family1));
 +    assertEquals("After compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +  }
 +
 +  @Test
 +  public void testCompactionWithHFileLink() throws IOException, InterruptedException {
 +    resetConf();
 +    int count = 4;
 +    // generate mob files
-     loadData(admin, hTable, tableName, count, rowNumPerFile);
++    loadData(admin, bufMut, tableName, count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    long tid = System.currentTimeMillis();
 +    byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid);
 +    // take a snapshot
 +    admin.snapshot(snapshotName1, tableName);
 +
 +    createDelFile();
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("Before compaction: family1 mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +    assertEquals("Before compaction: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("Before compaction: family1 del file count", regionNum,
 +        countFiles(tableName, false, family1));
 +    assertEquals("Before compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +
 +    // do the mob file compaction
 +    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
 +    compactor.compact();
 +
 +    assertEquals("After first compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("After first compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("After first compaction: family1 mob file count", regionNum,
 +        countFiles(tableName, true, family1));
 +    assertEquals("After first compaction: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("After first compaction: family1 del file count", 0,
 +      countFiles(tableName, false, family1));
 +    assertEquals("After first compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +    assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1));
 +    assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2));
 +
 +    admin.disableTable(tableName);
 +    // Restore from snapshot, the hfilelink will exist in mob dir
 +    admin.restoreSnapshot(snapshotName1);
 +    admin.enableTable(tableName);
 +
 +    assertEquals("After restoring snapshot: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("After restoring snapshot: mob cells count",
 +        regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
 +    assertEquals("After restoring snapshot: family1 mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +    assertEquals("After restoring snapshot: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("After restoring snapshot: family1 del file count", 0,
 +        countFiles(tableName, false, family1));
 +    assertEquals("After restoring snapshot: family2 del file count", 0,
 +        countFiles(tableName, false, family2));
 +    assertEquals("After restoring snapshot: family1 hfilelink count", regionNum*count,
 +        countHFileLinks(family1));
 +    assertEquals("After restoring snapshot: family2 hfilelink count", 0,
 +        countHFileLinks(family2));
 +
 +    compactor.compact();
 +
 +    assertEquals("After second compaction: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("After second compaction: mob cells count",
 +        regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
 +    assertEquals("After second compaction: family1 mob file count", regionNum,
 +        countFiles(tableName, true, family1));
 +    assertEquals("After second compaction: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("After second compaction: family1 del file count", 0,
 +      countFiles(tableName, false, family1));
 +    assertEquals("After second compaction: family2 del file count", 0,
 +      countFiles(tableName, false, family2));
 +    assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1));
 +    assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2));
 +    assertRefFileNameEqual(family1);
 +  }
 +
 +  @Test
 +  public void testCompactionFromAdmin() throws Exception {
 +    int count = 4;
 +    // generate mob files
-     loadData(admin, hTable, tableName, count, rowNumPerFile);
++    loadData(admin, bufMut, tableName, count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
 +        countMobCells(hTable));
 +    assertEquals("Before deleting: family1 mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +    assertEquals("Before deleting: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +
 +    createDelFile();
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("Before compaction: family1 mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +    assertEquals("Before compaction: family2 file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("Before compaction: family1 del file count", regionNum,
 +        countFiles(tableName, false, family1));
 +    assertEquals("Before compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +
 +    int largeFilesCount = countLargeFiles(5000, family1);
 +    // do the mob file compaction
 +    admin.compactMob(tableName, hcd1.getName());
 +
 +    waitUntilCompactionFinished(tableName);
 +    assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
 +      countMobRows(hTable));
 +    assertEquals("After compaction: mob cells count", regionNum
 +      * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(hTable));
 +    assertEquals("After compaction: family1 mob file count", regionNum + largeFilesCount,
 +      countFiles(tableName, true, family1));
 +    assertEquals("After compaction: family2 mob file count", regionNum * count,
 +      countFiles(tableName, true, family2));
 +    assertEquals("After compaction: family1 del file count", regionNum,
 +      countFiles(tableName, false, family1));
 +    assertEquals("After compaction: family2 del file count", regionNum,
 +      countFiles(tableName, false, family2));
 +    assertRefFileNameEqual(family1);
 +  }
 +
 +  @Test
 +  public void testMajorCompactionFromAdmin() throws Exception {
 +    int count = 4;
 +    // generate mob files
-     loadData(admin, hTable, tableName, count, rowNumPerFile);
++    loadData(admin, bufMut, tableName, count, rowNumPerFile);
 +    int rowNumPerRegion = count*rowNumPerFile;
 +
 +    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
 +        countMobRows(hTable));
 +    assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
 +        countMobCells(hTable));
 +    assertEquals("Before deleting: mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +
 +    createDelFile();
 +
 +    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("Before compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("Before compaction: family1 mob file count", regionNum*count,
 +        countFiles(tableName, true, family1));
 +    assertEquals("Before compaction: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("Before compaction: family1 del file count", regionNum,
 +        countFiles(tableName, false, family1));
 +    assertEquals("Before compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +
 +    // do the major mob file compaction, it will force all files to compaction
 +    admin.majorCompactMob(tableName, hcd1.getName());
 +
 +    waitUntilCompactionFinished(tableName);
 +    assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
 +        countMobRows(hTable));
 +    assertEquals("After compaction: mob cells count",
 +        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
 +    assertEquals("After compaction: family1 mob file count", regionNum,
 +        countFiles(tableName, true, family1));
 +    assertEquals("After compaction: family2 mob file count", regionNum*count,
 +        countFiles(tableName, true, family2));
 +    assertEquals("After compaction: family1 del file count", 0,
 +        countFiles(tableName, false, family1));
 +    assertEquals("After compaction: family2 del file count", regionNum,
 +        countFiles(tableName, false, family2));
 +  }
 +
 +  private void waitUntilCompactionFinished(TableName tableName) throws IOException,
 +    InterruptedException {
 +    long finished = EnvironmentEdgeManager.currentTime() + 60000;
 +    CompactionState state = admin.getMobCompactionState(tableName);
 +    while (EnvironmentEdgeManager.currentTime() < finished) {
 +      if (state == CompactionState.NONE) {
 +        break;
 +      }
 +      state = admin.getMobCompactionState(tableName);
 +      Thread.sleep(10);
 +    }
 +    assertEquals(CompactionState.NONE, state);
 +  }
 +
 +  /**
 +   * Gets the number of rows in the given table.
 +   * @param table to get the  scanner
 +   * @return the number of rows
 +   */
-   private int countMobRows(final HTable table) throws IOException {
++  private int countMobRows(final Table table) throws IOException {
 +    Scan scan = new Scan();
 +    // Do not retrieve the mob data when scanning
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      count++;
 +    }
 +    results.close();
 +    return count;
 +  }
 +
 +  /**
 +   * Gets the number of cells in the given table.
 +   * @param table to get the  scanner
 +   * @return the number of cells
 +   */
-   private int countMobCells(final HTable table) throws IOException {
++  private int countMobCells(final Table table) throws IOException {
 +    Scan scan = new Scan();
 +    // Do not retrieve the mob data when scanning
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      for (Cell cell : res.listCells()) {
 +        count++;
 +      }
 +    }
 +    results.close();
 +    return count;
 +  }
 +
 +  /**
 +   * Gets the number of files in the mob path.
 +   * @param isMobFile gets number of the mob files or del files
 +   * @param familyName the family name
 +   * @return the number of the files
 +   */
 +  private int countFiles(TableName tableName, boolean isMobFile, String familyName)
 +    throws IOException {
 +    Path mobDirPath = MobUtils.getMobFamilyPath(
 +        MobUtils.getMobRegionPath(conf, tableName), familyName);
 +    int count = 0;
 +    if (fs.exists(mobDirPath)) {
 +      FileStatus[] files = fs.listStatus(mobDirPath);
 +      for (FileStatus file : files) {
 +        if (isMobFile == true) {
 +          if (!StoreFileInfo.isDelFile(file.getPath())) {
 +            count++;
 +          }
 +        } else {
 +          if (StoreFileInfo.isDelFile(file.getPath())) {
 +            count++;
 +          }
 +        }
 +      }
 +    }
 +    return count;
 +  }
 +
 +  /**
 +   * Gets the number of HFileLink in the mob path.
 +   * @param familyName the family name
 +   * @return the number of the HFileLink
 +   */
 +  private int countHFileLinks(String familyName) throws IOException {
 +    Path mobDirPath = MobUtils.getMobFamilyPath(
 +        MobUtils.getMobRegionPath(conf, tableName), familyName);
 +    int count = 0;
 +    if (fs.exists(mobDirPath)) {
 +      FileStatus[] files = fs.listStatus(mobDirPath);
 +      for (FileStatus file : files) {
 +        if (HFileLink.isHFileLink(file.getPath())) {
 +          count++;
 +        }
 +      }
 +    }
 +    return count;
 +  }
 +
 +  /**
 +   * Gets the number of files.
 +   * @param size the size of the file
 +   * @param familyName the family name
 +   * @return the number of files large than the size
 +   */
 +  private int countLargeFiles(int size, String familyName) throws IOException {
 +    Path mobDirPath = MobUtils.getMobFamilyPath(
 +        MobUtils.getMobRegionPath(conf, tableName), familyName);
 +    int count = 0;
 +    if (fs.exists(mobDirPath)) {
 +      FileStatus[] files = fs.listStatus(mobDirPath);
 +      for (FileStatus file : files) {
 +        // ignore the del files in the mob path
 +        if ((!StoreFileInfo.isDelFile(file.getPath()))
 +            && (file.getLen() > size)) {
 +          count++;
 +        }
 +      }
 +    }
 +    return count;
 +  }
 +
 +  /**
 +   * loads some data to the table.
-    * @param count the mob file number
 +   */
-   private void loadData(Admin admin, HTable table, TableName tableName, int fileNum,
++  private void loadData(Admin admin, BufferedMutator table, TableName tableName, int fileNum,
 +    int rowNumPerFile) throws IOException, InterruptedException {
 +    if (fileNum <= 0) {
 +      throw new IllegalArgumentException();
 +    }
 +    for (byte k0 : KEYS) {
 +      byte[] k = new byte[] { k0 };
 +      for (int i = 0; i < fileNum * rowNumPerFile; i++) {
 +        byte[] key = Bytes.add(k, Bytes.toBytes(i));
 +        byte[] mobVal = makeDummyData(10 * (i + 1));
 +        Put put = new Put(key);
 +        put.setDurability(Durability.SKIP_WAL);
-         put.add(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
-         put.add(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
-         put.add(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
-         table.put(put);
++        put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
++        put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
++        put.addColumn(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
++        table.mutate(put);
 +        if ((i + 1) % rowNumPerFile == 0) {
-           table.flushCommits();
++          table.flush();
 +          admin.flush(tableName);
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * delete the row, family and cell to create the del file
 +   */
 +  private void createDelFile() throws IOException, InterruptedException {
 +    for (byte k0 : KEYS) {
 +      byte[] k = new byte[] { k0 };
 +      // delete a family
 +      byte[] key1 = Bytes.add(k, Bytes.toBytes(0));
 +      Delete delete1 = new Delete(key1);
-       delete1.deleteFamily(Bytes.toBytes(family1));
++      delete1.addFamily(Bytes.toBytes(family1));
 +      hTable.delete(delete1);
 +      // delete one row
 +      byte[] key2 = Bytes.add(k, Bytes.toBytes(2));
 +      Delete delete2 = new Delete(key2);
 +      hTable.delete(delete2);
 +      // delete one cell
 +      byte[] key3 = Bytes.add(k, Bytes.toBytes(4));
 +      Delete delete3 = new Delete(key3);
-       delete3.deleteColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1));
++      delete3.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1));
 +      hTable.delete(delete3);
-       hTable.flushCommits();
 +      admin.flush(tableName);
 +      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(
 +          Bytes.toBytes(tableNameAsString));
 +      for (HRegion region : regions) {
 +        region.waitForFlushesAndCompactions();
-         region.compactStores(true);
++        region.compact(true);
 +      }
 +    }
 +  }
 +  /**
 +   * Creates the dummy data with a specific size.
-    * @param the size of data
++   * @param size the size of value
 +   * @return the dummy data
 +   */
 +  private byte[] makeDummyData(int size) {
 +    byte[] dummyData = new byte[size];
 +    new Random().nextBytes(dummyData);
 +    return dummyData;
 +  }
 +
 +  /**
 +   * Gets the split keys
 +   */
 +  private byte[][] getSplitKeys() {
 +    byte[][] splitKeys = new byte[KEYS.length - 1][];
 +    for (int i = 0; i < splitKeys.length; ++i) {
 +      splitKeys[i] = new byte[] { KEYS[i + 1] };
 +    }
 +    return splitKeys;
 +  }
 +
 +  private static ExecutorService createThreadPool(Configuration conf) {
 +    int maxThreads = 10;
 +    long keepAliveTime = 60;
 +    final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
 +    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
 +        keepAliveTime, TimeUnit.SECONDS, queue,
 +        Threads.newDaemonThreadFactory("MobFileCompactionChore"),
 +        new RejectedExecutionHandler() {
 +          @Override
 +          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 +            try {
 +              // waiting for a thread to pick up instead of throwing exceptions.
 +              queue.put(r);
 +            } catch (InterruptedException e) {
 +              throw new RejectedExecutionException(e);
 +            }
 +          }
 +        });
 +    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
 +    return pool;
 +  }
 +
 +  private void assertRefFileNameEqual(String familyName) throws IOException {
 +    Scan scan = new Scan();
 +    scan.addFamily(Bytes.toBytes(familyName));
 +    // Do not retrieve the mob data when scanning
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner results = hTable.getScanner(scan);
 +    Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
 +        tableName), familyName);
 +    List<Path> actualFilePaths = new ArrayList<>();
 +    List<Path> expectFilePaths = new ArrayList<>();
 +    for (Result res : results) {
 +      for (Cell cell : res.listCells()) {
 +        byte[] referenceValue = CellUtil.cloneValue(cell);
 +        String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
 +            referenceValue.length - Bytes.SIZEOF_INT);
 +        Path targetPath = new Path(mobFamilyPath, fileName);
 +        if(!actualFilePaths.contains(targetPath)) {
 +          actualFilePaths.add(targetPath);
 +        }
 +      }
 +    }
 +    results.close();
 +    if (fs.exists(mobFamilyPath)) {
 +      FileStatus[] files = fs.listStatus(mobFamilyPath);
 +      for (FileStatus file : files) {
 +        if (!StoreFileInfo.isDelFile(file.getPath())) {
 +          expectFilePaths.add(file.getPath());
 +        }
 +      }
 +    }
 +    Collections.sort(actualFilePaths);
 +    Collections.sort(expectFilePaths);
 +    assertEquals(expectFilePaths, actualFilePaths);
 +  }
 +
 +  /**
 +   * Resets the configuration.
 +   */
 +  private void resetConf() {
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
index 3c73d52,0000000..ed3853e
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
@@@ -1,446 -1,0 +1,441 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.filecompactions;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.List;
 +import java.util.Random;
 +import java.util.UUID;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.RejectedExecutionException;
 +import java.util.concurrent.RejectedExecutionHandler;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValue.Type;
++import org.apache.hadoop.hbase.regionserver.*;
 +import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.io.hfile.HFileContext;
 +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobFileName;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
- import org.apache.hadoop.hbase.regionserver.BloomType;
- import org.apache.hadoop.hbase.regionserver.HStore;
- import org.apache.hadoop.hbase.regionserver.ScanInfo;
- import org.apache.hadoop.hbase.regionserver.ScanType;
- import org.apache.hadoop.hbase.regionserver.StoreFile;
- import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
- import org.apache.hadoop.hbase.regionserver.StoreScanner;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.util.Threads;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(LargeTests.class)
 +public class TestPartitionedMobFileCompactor {
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static String family = "family";
 +  private final static String qf = "qf";
 +  private HColumnDescriptor hcd = new HColumnDescriptor(family);
 +  private Configuration conf = TEST_UTIL.getConfiguration();
 +  private CacheConfig cacheConf = new CacheConfig(conf);
 +  private FileSystem fs;
 +  private List<FileStatus> mobFiles = new ArrayList<>();
 +  private List<FileStatus> delFiles = new ArrayList<>();
 +  private List<FileStatus> allFiles = new ArrayList<>();
 +  private Path basePath;
 +  private String mobSuffix;
 +  private String delSuffix;
 +  private static ExecutorService pool;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
 +    TEST_UTIL.startMiniCluster(1);
-     pool = createThreadPool(TEST_UTIL.getConfiguration());
++    pool = createThreadPool();
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    pool.shutdown();
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  private void init(String tableName) throws Exception {
 +    fs = FileSystem.get(conf);
 +    Path testDir = FSUtils.getRootDir(conf);
 +    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
 +    basePath = new Path(new Path(mobTestDir, tableName), family);
 +    mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
 +    delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
 +  }
 +
 +  @Test
 +  public void testCompactionSelectWithAllFiles() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactionSelectWithAllFiles";
 +    init(tableName);
 +    int count = 10;
 +    // create 10 mob files.
 +    createStoreFiles(basePath, family, qf, count, Type.Put);
 +    // create 10 del files
 +    createStoreFiles(basePath, family, qf, count, Type.Delete);
 +    listFiles();
 +    long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD;
 +    List<String> expectedStartKeys = new ArrayList<>();
 +    for(FileStatus file : mobFiles) {
 +      if(file.getLen() < mergeSize) {
 +        String fileName = file.getPath().getName();
 +        String startKey = fileName.substring(0, 32);
 +        expectedStartKeys.add(startKey);
 +      }
 +    }
 +    testSelectFiles(tableName, CompactionType.ALL_FILES, false, expectedStartKeys);
 +  }
 +
 +  @Test
 +  public void testCompactionSelectWithPartFiles() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactionSelectWithPartFiles";
 +    init(tableName);
 +    int count = 10;
 +    // create 10 mob files.
 +    createStoreFiles(basePath, family, qf, count, Type.Put);
 +    // create 10 del files
 +    createStoreFiles(basePath, family, qf, count, Type.Delete);
 +    listFiles();
 +    long mergeSize = 4000;
 +    List<String> expectedStartKeys = new ArrayList<>();
 +    for(FileStatus file : mobFiles) {
 +      if(file.getLen() < 4000) {
 +        String fileName = file.getPath().getName();
 +        String startKey = fileName.substring(0, 32);
 +        expectedStartKeys.add(startKey);
 +      }
 +    }
 +    // set the mob file compaction mergeable threshold
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
 +    testSelectFiles(tableName, CompactionType.PART_FILES, false, expectedStartKeys);
 +  }
 +
 +  @Test
 +  public void testCompactionSelectWithForceAllFiles() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactionSelectWithForceAllFiles";
 +    init(tableName);
 +    int count = 10;
 +    // create 10 mob files.
 +    createStoreFiles(basePath, family, qf, count, Type.Put);
 +    // create 10 del files
 +    createStoreFiles(basePath, family, qf, count, Type.Delete);
 +    listFiles();
 +    long mergeSize = 4000;
 +    List<String> expectedStartKeys = new ArrayList<>();
 +    for(FileStatus file : mobFiles) {
 +      String fileName = file.getPath().getName();
 +      String startKey = fileName.substring(0, 32);
 +      expectedStartKeys.add(startKey);
 +    }
 +    // set the mob file compaction mergeable threshold
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
 +    testSelectFiles(tableName, CompactionType.ALL_FILES, true, expectedStartKeys);
 +  }
 +
 +  @Test
 +  public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactDelFilesWithDefaultBatchSize";
 +    init(tableName);
 +    // create 20 mob files.
 +    createStoreFiles(basePath, family, qf, 20, Type.Put);
 +    // create 13 del files
 +    createStoreFiles(basePath, family, qf, 13, Type.Delete);
 +    listFiles();
 +    testCompactDelFiles(tableName, 1, 13, false);
 +  }
 +
 +  @Test
 +  public void testCompactDelFilesWithSmallBatchSize() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactDelFilesWithSmallBatchSize";
 +    init(tableName);
 +    // create 20 mob files.
 +    createStoreFiles(basePath, family, qf, 20, Type.Put);
 +    // create 13 del files
 +    createStoreFiles(basePath, family, qf, 13, Type.Delete);
 +    listFiles();
 +
 +    // set the mob file compaction batch size
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4);
 +    testCompactDelFiles(tableName, 1, 13, false);
 +  }
 +
 +  @Test
 +  public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactDelFilesWithSmallBatchSize";
 +    init(tableName);
 +    // create 20 mob files.
 +    createStoreFiles(basePath, family, qf, 20, Type.Put);
 +    // create 13 del files
 +    createStoreFiles(basePath, family, qf, 13, Type.Delete);
 +    listFiles();
 +
 +    // set the max del file count
 +    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5);
 +    // set the mob file compaction batch size
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2);
 +    testCompactDelFiles(tableName, 4, 13, false);
 +  }
 +
 +  /**
 +   * Tests the selectFiles
 +   * @param tableName the table name
 +   * @param type the expected compaction type
 +   * @param expected the expected start keys
 +   */
 +  private void testSelectFiles(String tableName, final CompactionType type,
 +    final boolean isForceAllFiles, final List<String> expected) throws IOException {
 +    PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
 +      TableName.valueOf(tableName), hcd, pool) {
 +      @Override
 +      public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
 +        throws IOException {
 +        if (files == null || files.isEmpty()) {
 +          return null;
 +        }
 +        PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles);
 +        // assert the compaction type
 +        Assert.assertEquals(type, request.type);
 +        // assert get the right partitions
 +        compareCompactedPartitions(expected, request.compactionPartitions);
 +        // assert get the right del files
 +        compareDelFiles(request.delFiles);
 +        return null;
 +      }
 +    };
 +    compactor.compact(allFiles, isForceAllFiles);
 +  }
 +
 +  /**
 +   * Tests the compacteDelFile
 +   * @param tableName the table name
 +   * @param expectedFileCount the expected file count
 +   * @param expectedCellCount the expected cell count
 +   */
 +  private void testCompactDelFiles(String tableName, final int expectedFileCount,
 +      final int expectedCellCount, boolean isForceAllFiles) throws IOException {
 +    PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
 +      TableName.valueOf(tableName), hcd, pool) {
 +      @Override
 +      protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
 +          throws IOException {
 +        List<Path> delFilePaths = new ArrayList<Path>();
 +        for (FileStatus delFile : request.delFiles) {
 +          delFilePaths.add(delFile.getPath());
 +        }
 +        List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
 +        // assert the del files are merged.
 +        Assert.assertEquals(expectedFileCount, newDelPaths.size());
 +        Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
 +        return null;
 +      }
 +    };
 +    compactor.compact(allFiles, isForceAllFiles);
 +  }
 +
 +  /**
 +   * Lists the files in the path
 +   */
 +  private void listFiles() throws IOException {
 +    for (FileStatus file : fs.listStatus(basePath)) {
 +      allFiles.add(file);
 +      if (file.getPath().getName().endsWith("_del")) {
 +        delFiles.add(file);
 +      } else {
 +        mobFiles.add(file);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Compares the compacted partitions.
 +   * @param partitions the collection of CompactedPartitions
 +   */
 +  private void compareCompactedPartitions(List<String> expected,
 +      Collection<CompactionPartition> partitions) {
 +    List<String> actualKeys = new ArrayList<>();
 +    for (CompactionPartition partition : partitions) {
 +      actualKeys.add(partition.getPartitionId().getStartKey());
 +    }
 +    Collections.sort(expected);
 +    Collections.sort(actualKeys);
 +    Assert.assertEquals(expected.size(), actualKeys.size());
 +    for (int i = 0; i < expected.size(); i++) {
 +      Assert.assertEquals(expected.get(i), actualKeys.get(i));
 +    }
 +  }
 +
 +  /**
 +   * Compares the del files.
 +   * @param allDelFiles all the del files
 +   */
 +  private void compareDelFiles(Collection<FileStatus> allDelFiles) {
 +    int i = 0;
 +    for (FileStatus file : allDelFiles) {
 +      Assert.assertEquals(delFiles.get(i), file);
 +      i++;
 +    }
 +  }
 +
 +  /**
 +   * Creates store files.
 +   * @param basePath the path to create file
 +   * @family the family name
 +   * @qualifier the column qualifier
 +   * @count the store file number
 +   * @type the key type
 +   */
 +  private void createStoreFiles(Path basePath, String family, String qualifier, int count,
 +      Type type) throws IOException {
 +    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
 +    String startKey = "row_";
 +    MobFileName mobFileName = null;
 +    for (int i = 0; i < count; i++) {
 +      byte[] startRow = Bytes.toBytes(startKey + i) ;
 +      if(type.equals(Type.Delete)) {
 +        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
 +            new Date()), delSuffix);
 +      }
 +      if(type.equals(Type.Put)){
 +        mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
 +            new Date()), mobSuffix);
 +      }
 +      StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs)
 +      .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
 +      writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
 +          type, (i+1)*1000);
 +    }
 +  }
 +
 +  /**
 +   * Writes data to store file.
 +   * @param writer the store file writer
 +   * @param row the row key
 +   * @param family the family name
 +   * @param qualifier the column qualifier
 +   * @param type the key type
 +   * @param size the size of value
 +   */
 +  private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family,
 +      byte[] qualifier, Type type, int size) throws IOException {
 +    long now = System.currentTimeMillis();
 +    try {
 +      byte[] dummyData = new byte[size];
 +      new Random().nextBytes(dummyData);
 +      writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
 +    } finally {
 +      writer.close();
 +    }
 +  }
 +
 +  /**
 +   * Gets the number of del cell in the del files
 +   * @param paths the del file paths
 +   * @return the cell size
 +   */
 +  private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
 +    List<StoreFile> sfs = new ArrayList<StoreFile>();
 +    int size = 0;
 +    for(Path path : paths) {
 +      StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
 +      sfs.add(sf);
 +    }
 +    List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
 +        false, null, HConstants.LATEST_TIMESTAMP);
 +    Scan scan = new Scan();
 +    scan.setMaxVersions(hcd.getMaxVersions());
 +    long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
 +    long ttl = HStore.determineTTLFromFamily(hcd);
 +    ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
 +    StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null,
 +        scanners, 0L, HConstants.LATEST_TIMESTAMP);
 +    List<Cell> results = new ArrayList<>();
 +    boolean hasMore = true;
++
 +    while (hasMore) {
 +      hasMore = scanner.next(results);
 +      size += results.size();
 +      results.clear();
 +    }
 +    scanner.close();
 +    return size;
 +  }
 +
-   private static ExecutorService createThreadPool(Configuration conf) {
++  private static ExecutorService createThreadPool() {
 +    int maxThreads = 10;
 +    long keepAliveTime = 60;
 +    final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
 +    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
 +      TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
 +      new RejectedExecutionHandler() {
 +        @Override
 +        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 +          try {
 +            // waiting for a thread to pick up instead of throwing exceptions.
 +            queue.put(r);
 +          } catch (InterruptedException e) {
 +            throw new RejectedExecutionException(e);
 +          }
 +        }
 +      });
 +    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
 +    return pool;
 +  }
 +
 +  /**
 +   * Resets the configuration.
 +   */
 +  private void resetConf() {
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
 +    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
index 49345e4,0000000..3023849
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
@@@ -1,168 -1,0 +1,169 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.mapreduce;
 +
++import static org.junit.Assert.assertArrayEquals;
 +import static org.junit.Assert.assertEquals;
 +
 +import java.io.IOException;
 +import java.util.List;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.CommonConfigurationKeys;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.io.IOUtils;
 +import org.apache.hadoop.io.SequenceFile;
 +import org.apache.hadoop.io.serializer.JavaSerialization;
 +import org.apache.hadoop.io.serializer.WritableSerialization;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(MediumTests.class)
 +public class TestMobSweepJob {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
 +        JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
 +    TEST_UTIL.startMiniCluster();
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  private void writeFileNames(FileSystem fs, Configuration conf, Path path,
 +      String[] filesNames) throws IOException {
 +    // write the names to a sequence file
 +    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
 +        String.class, String.class);
 +    try {
 +      for (String fileName : filesNames) {
 +        writer.append(fileName, MobConstants.EMPTY_STRING);
 +      }
 +    } finally {
 +      IOUtils.closeStream(writer);
 +    }
 +  }
 +
 +  @Test
 +  public void testSweeperJobWithOutUnusedFile() throws Exception {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Configuration configuration = new Configuration(
 +        TEST_UTIL.getConfiguration());
 +    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/0/visited");
 +    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/0/all");
 +    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
 +        vistiedFileNamesPath.toString());
 +    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
 +        allFileNamesPath.toString());
 +
 +    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
 +        "2", "3", "4", "5", "6"});
 +
 +    Path r0 = new Path(vistiedFileNamesPath, "r0");
 +    writeFileNames(fs, configuration, r0, new String[] { "1",
 +        "2", "3"});
 +    Path r1 = new Path(vistiedFileNamesPath, "r1");
 +    writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"});
 +    Path r2 = new Path(vistiedFileNamesPath, "r2");
 +    writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"});
 +
 +    SweepJob sweepJob = new SweepJob(configuration, fs);
 +    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
 +
 +    assertEquals(0, toBeArchived.size());
 +  }
 +
 +  @Test
 +  public void testSweeperJobWithUnusedFile() throws Exception {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Configuration configuration = new Configuration(
 +        TEST_UTIL.getConfiguration());
 +    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/1/visited");
 +    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/1/all");
 +    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
 +        vistiedFileNamesPath.toString());
 +    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
 +        allFileNamesPath.toString());
 +
 +    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
 +        "2", "3", "4", "5", "6"});
 +
 +    Path r0 = new Path(vistiedFileNamesPath, "r0");
 +    writeFileNames(fs, configuration, r0, new String[] { "1",
 +        "2", "3"});
 +    Path r1 = new Path(vistiedFileNamesPath, "r1");
 +    writeFileNames(fs, configuration, r1, new String[] { "1", "5"});
 +    Path r2 = new Path(vistiedFileNamesPath, "r2");
 +    writeFileNames(fs, configuration, r2, new String[] { "2", "3"});
 +
 +    SweepJob sweepJob = new SweepJob(configuration, fs);
 +    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
 +
 +    assertEquals(2, toBeArchived.size());
-     assertEquals(new String[] { "4", "6" }, toBeArchived.toArray(new String[0]));
++    assertArrayEquals(new String[]{"4", "6"}, toBeArchived.toArray(new String[0]));
 +  }
 +
 +  @Test
 +  public void testSweeperJobWithRedundantFile() throws Exception {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Configuration configuration = new Configuration(
 +        TEST_UTIL.getConfiguration());
 +    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/2/visited");
 +    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
 +        "/hbase/mobcompaction/SweepJob/working/names/2/all");
 +    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
 +        vistiedFileNamesPath.toString());
 +    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
 +        allFileNamesPath.toString());
 +
 +    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
 +        "2", "3", "4", "5", "6"});
 +
 +    Path r0 = new Path(vistiedFileNamesPath, "r0");
 +    writeFileNames(fs, configuration, r0, new String[] { "1",
 +        "2", "3"});
 +    Path r1 = new Path(vistiedFileNamesPath, "r1");
 +    writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"});
 +    Path r2 = new Path(vistiedFileNamesPath, "r2");
 +    writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"});
 +
 +    SweepJob sweepJob = new SweepJob(configuration, fs);
 +    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
 +
 +    assertEquals(0, toBeArchived.size());
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
index 308b50e,0000000..8c24123
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
@@@ -1,220 -1,0 +1,219 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.mapreduce;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.mockito.Mockito.mock;
 +import static org.mockito.Mockito.when;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.TreeSet;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.CommonConfigurationKeys;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.ServerName;
 +import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Admin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
++import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 +import org.apache.hadoop.hbase.master.TableLockManager;
 +import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
 +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
 +import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 +import org.apache.hadoop.io.SequenceFile;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.io.serializer.JavaSerialization;
 +import org.apache.hadoop.mapreduce.Counter;
 +import org.apache.hadoop.mapreduce.Reducer;
 +import org.apache.hadoop.mapreduce.counters.GenericCounter;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +import org.mockito.Matchers;
 +
 +@Category(MediumTests.class)
 +public class TestMobSweepReducer {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static String tableName = "testSweepReducer";
 +  private final static String row = "row";
 +  private final static String family = "family";
 +  private final static String qf = "qf";
-   private static HTable table;
++  private static BufferedMutator table;
 +  private static Admin admin;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +
 +    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
 +
 +    TEST_UTIL.startMiniCluster(1);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  @SuppressWarnings("deprecation")
 +  @Before
 +  public void setUp() throws Exception {
 +    HTableDescriptor desc = new HTableDescriptor(tableName);
 +    HColumnDescriptor hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    hcd.setMaxVersions(4);
 +    desc.addFamily(hcd);
 +
 +    admin = TEST_UTIL.getHBaseAdmin();
 +    admin.createTable(desc);
-     table = new HTable(TEST_UTIL.getConfiguration(), tableName);
++    table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
++            .getBufferedMutator(TableName.valueOf(tableName));
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    admin.disableTable(TableName.valueOf(tableName));
 +    admin.deleteTable(TableName.valueOf(tableName));
 +    admin.close();
 +  }
 +
 +  private List<String> getKeyFromSequenceFile(FileSystem fs, Path path,
 +                                              Configuration conf) throws Exception {
 +    List<String> list = new ArrayList<String>();
 +    SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
 +
 +    String next = (String) reader.next((String) null);
 +    while (next != null) {
 +      list.add(next);
 +      next = (String) reader.next((String) null);
 +    }
 +    reader.close();
 +    return list;
 +  }
 +
 +  @Test
 +  public void testRun() throws Exception {
 +
 +    TableName tn = TableName.valueOf(tableName);
 +    byte[] mobValueBytes = new byte[100];
 +
 +    //get the path where mob files lie in
 +    Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family);
 +
 +    Put put = new Put(Bytes.toBytes(row));
-     put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
++    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
 +    Put put2 = new Put(Bytes.toBytes(row + "ignore"));
-     put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
-     table.put(put);
-     table.put(put2);
-     table.flushCommits();
++    put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
++    table.mutate(put);
++    table.mutate(put2);
++    table.flush();
 +    admin.flush(tn);
 +
 +    FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    //check the generation of a mob file
 +    assertEquals(1, fileStatuses.length);
 +
 +    String mobFile1 = fileStatuses[0].getPath().getName();
 +
 +    Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
 +    configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f);
 +    configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName);
 +    configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family);
 +    configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir");
 +    configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir");
 +    configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
 +            JavaSerialization.class.getName());
 +    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir");
 +    configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
 +        System.currentTimeMillis() + 24 * 3600 * 1000);
 +
 +    ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
 +    TableName lockName = MobUtils.getTableLockName(tn);
 +    String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
 +    configuration.set(SweepJob.SWEEP_JOB_ID, "1");
 +    configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
 +    ServerName serverName = SweepJob.getCurrentServerName(configuration);
 +    configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
 +
 +    TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
 +        serverName);
 +    TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
 +    lock.acquire();
 +    try {
 +      // use the same counter when mocking
 +      Counter counter = new GenericCounter();
 +      Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class);
 +      when(ctx.getConfiguration()).thenReturn(configuration);
 +      when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
 +      when(ctx.nextKey()).thenReturn(true).thenReturn(false);
 +      when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
 +
 +      byte[] refBytes = Bytes.toBytes(mobFile1);
 +      long valueLength = refBytes.length;
 +      byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
 +      KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1,
 +        KeyValue.Type.Put, newValue);
 +      List<KeyValue> list = new ArrayList<KeyValue>();
 +      list.add(kv2);
 +
 +      when(ctx.getValues()).thenReturn(list);
 +
 +      SweepReducer reducer = new SweepReducer();
 +      reducer.run(ctx);
 +    } finally {
 +      lock.release();
 +    }
 +    FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    String mobFile2 = filsStatuses2[0].getPath().getName();
 +    //new mob file is generated, old one has been archived
 +    assertEquals(1, filsStatuses2.length);
 +    assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
 +
 +    //test sequence file
 +    String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY);
 +    FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
 +    Set<String> files = new TreeSet<String>();
 +    for (FileStatus st : statuses) {
 +      files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(),
 +              st.getPath(), configuration));
 +    }
 +    assertEquals(1, files.size());
 +    assertEquals(true, files.contains(mobFile1));
 +  }
 +}