You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/07/22 21:52:26 UTC

[08/50] [abbrv] hbase git commit: Merge branch 'apache/master' (4/16/15) into hbase-11339

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
index 8a017a2,0000000..31778ae
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
@@@ -1,310 -1,0 +1,308 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.mapreduce;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.io.IOException;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.TreeSet;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.NamespaceDescriptor;
 +import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Admin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.ResultScanner;
- import org.apache.hadoop.hbase.client.Scan;
++import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.util.ToolRunner;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(MediumTests.class)
 +public class TestMobSweeper {
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private String tableName;
 +  private final static String row = "row_";
 +  private final static String family = "family";
 +  private final static String column = "column";
-   private static HTable table;
++  private static Table table;
++  private static BufferedMutator bufMut;
 +  private static Admin admin;
 +
 +  private Random random = new Random();
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 15); // avoid major compactions
 +    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.max", 30); // avoid major compactions
 +    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
 +
 +    TEST_UTIL.startMiniCluster();
 +
 +    TEST_UTIL.startMiniMapReduceCluster();
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +    TEST_UTIL.shutdownMiniMapReduceCluster();
 +  }
 +
 +  @SuppressWarnings("deprecation")
 +  @Before
 +  public void setUp() throws Exception {
 +    long tid = System.currentTimeMillis();
 +    tableName = "testSweeper" + tid;
 +    HTableDescriptor desc = new HTableDescriptor(tableName);
 +    HColumnDescriptor hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    hcd.setMaxVersions(4);
 +    desc.addFamily(hcd);
 +
 +    admin = TEST_UTIL.getHBaseAdmin();
 +    admin.createTable(desc);
-     table = new HTable(TEST_UTIL.getConfiguration(), tableName);
-     table.setAutoFlush(false, false);
- 
++    Connection c = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
++    TableName tn = TableName.valueOf(tableName);
++    table = c.getTable(tn);
++    bufMut = c.getBufferedMutator(tn);
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    admin.disableTable(TableName.valueOf(tableName));
 +    admin.deleteTable(TableName.valueOf(tableName));
 +    admin.close();
 +  }
 +
 +  private Path getMobFamilyPath(Configuration conf, String tableNameStr,
 +                                String familyName) {
 +    Path p = new Path(MobUtils.getMobRegionPath(conf, TableName.valueOf(tableNameStr)),
 +            familyName);
 +    return p;
 +  }
 +
 +  private String mergeString(Set<String> set) {
 +    StringBuilder sb = new StringBuilder();
 +    for (String s : set)
 +      sb.append(s);
 +    return sb.toString();
 +  }
 +
-   private void generateMobTable(Admin admin, HTable table, String tableName, int count,
++  private void generateMobTable(Admin admin, BufferedMutator table, String tableName, int count,
 +    int flushStep) throws IOException, InterruptedException {
 +    if (count <= 0 || flushStep <= 0)
 +      return;
 +    int index = 0;
 +    for (int i = 0; i < count; i++) {
 +      byte[] mobVal = new byte[101*1024];
 +      random.nextBytes(mobVal);
 +
 +      Put put = new Put(Bytes.toBytes(row + i));
-       put.add(Bytes.toBytes(family), Bytes.toBytes(column), mobVal);
-       table.put(put);
++      put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), mobVal);
++      table.mutate(put);
 +      if (index++ % flushStep == 0) {
-         table.flushCommits();
++        table.flush();
 +        admin.flush(TableName.valueOf(tableName));
 +      }
 +    }
-     table.flushCommits();
++    table.flush();
 +    admin.flush(TableName.valueOf(tableName));
 +  }
 +
 +  @Test
 +  public void testSweeper() throws Exception {
 +    int count = 10;
 +    //create table and generate 10 mob files
-     generateMobTable(admin, table, tableName, count, 1);
++    generateMobTable(admin, bufMut, tableName, count, 1);
 +    //get mob files
 +    Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
 +    FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
-     // mobFileSet0 stores the orignal mob files
++    // mobFileSet0 stores the original mob files
 +    TreeSet<String> mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +
 +    //scan the table, retreive the references
 +    Scan scan = new Scan();
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner rs = table.getScanner(scan);
 +    TreeSet<String> mobFilesScanned = new TreeSet<String>();
 +    for (Result res : rs) {
 +      byte[] valueBytes = res.getValue(Bytes.toBytes(family),
 +          Bytes.toBytes(column));
 +      mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
 +          valueBytes.length - Bytes.SIZEOF_INT));
 +    }
 +    //there should be 10 mob files
 +    assertEquals(10, mobFilesScanned.size());
 +    //check if we store the correct reference of mob files
 +    assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
 +
 +    Configuration conf = TEST_UTIL.getConfiguration();
 +    conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 24 * 60 * 60 * 1000);
 +
 +    String[] args = new String[2];
 +    args[0] = tableName;
 +    args[1] = family;
 +    assertEquals(0, ToolRunner.run(conf, new Sweeper(), args));
 +
 +    mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
 +    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +    assertEquals(10, mobFilesSet.size());
 +
 +    scan = new Scan();
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
 +    rs = table.getScanner(scan);
 +    TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
 +    for (Result res : rs) {
 +      byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
 +          column));
 +      mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
 +          valueBytes.length - Bytes.SIZEOF_INT));
 +    }
 +    assertEquals(10, mobFilesScannedAfterJob.size());
 +
 +    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +    assertEquals(10, mobFilesSet.size());
 +    assertEquals(true, mobFilesScannedAfterJob.iterator().next()
 +            .equalsIgnoreCase(mobFilesSet.iterator().next()));
 +  }
 +
-   private void testCompactionDelaySweeperInternal(HTable table, String tableName)
++  private void testCompactionDelaySweeperInternal(Table table, BufferedMutator bufMut, String tableName)
 +    throws Exception {
 +    int count = 10;
 +    //create table and generate 10 mob files
-     generateMobTable(admin, table, tableName, count, 1);
++    generateMobTable(admin, bufMut, tableName, count, 1);
 +    //get mob files
 +    Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
 +    FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    // mobFileSet0 stores the orignal mob files
 +    TreeSet<String> mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +
 +    //scan the table, retreive the references
 +    Scan scan = new Scan();
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner rs = table.getScanner(scan);
 +    TreeSet<String> mobFilesScanned = new TreeSet<String>();
 +    for (Result res : rs) {
 +      byte[] valueBytes = res.getValue(Bytes.toBytes(family),
 +              Bytes.toBytes(column));
 +      mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
 +          valueBytes.length - Bytes.SIZEOF_INT));
 +    }
 +    //there should be 10 mob files
 +    assertEquals(10, mobFilesScanned.size());
 +    //check if we store the correct reference of mob files
 +    assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
 +
 +    Configuration conf = TEST_UTIL.getConfiguration();
 +    conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 0);
 +    String[] args = new String[2];
 +    args[0] = tableName;
 +    args[1] = family;
 +    assertEquals(0, ToolRunner.run(conf, new Sweeper(), args));
 +
 +    mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
 +    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +    assertEquals(1, mobFilesSet.size());
 +
 +    scan = new Scan();
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
 +    rs = table.getScanner(scan);
 +    TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
 +    for (Result res : rs) {
 +      byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
 +              column));
 +      mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
 +          valueBytes.length - Bytes.SIZEOF_INT));
 +    }
 +    assertEquals(1, mobFilesScannedAfterJob.size());
 +
 +    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
 +    mobFilesSet = new TreeSet<String>();
 +    for (FileStatus status : fileStatuses) {
 +      mobFilesSet.add(status.getPath().getName());
 +    }
 +    assertEquals(1, mobFilesSet.size());
 +    assertEquals(true, mobFilesScannedAfterJob.iterator().next()
 +            .equalsIgnoreCase(mobFilesSet.iterator().next()));
 +  }
 +
 +  @Test
 +  public void testCompactionDelaySweeper() throws Exception {
-     testCompactionDelaySweeperInternal(table, tableName);
++    testCompactionDelaySweeperInternal(table, bufMut, tableName);
 +  }
 +
 +  @Test
 +  public void testCompactionDelaySweeperWithNamespace() throws Exception {
 +    // create a table with namespace
 +    NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("ns").build();
 +    admin.createNamespace(namespaceDescriptor);
 +    String tableNameAsString = "ns:testSweeperWithNamespace";
 +    TableName tableName = TableName.valueOf(tableNameAsString);
 +    HTableDescriptor desc = new HTableDescriptor(tableName);
 +    HColumnDescriptor hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    hcd.setMaxVersions(4);
 +    desc.addFamily(hcd);
 +    admin.createTable(desc);
-     HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
-     table.setAutoFlush(false, false);
-     testCompactionDelaySweeperInternal(table, tableNameAsString);
++    Connection c = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
++    BufferedMutator bufMut = c.getBufferedMutator(tableName);
++    Table table = c.getTable(tableName);
++    testCompactionDelaySweeperInternal(table, bufMut, tableNameAsString);
 +    table.close();
 +    admin.disableTable(tableName);
 +    admin.deleteTable(tableName);
 +    admin.deleteNamespace("ns");
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
index 028e602,0000000..6dbcec0
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
@@@ -1,225 -1,0 +1,218 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import java.io.IOException;
 +import java.util.Random;
 +
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.ResultScanner;
- import org.apache.hadoop.hbase.client.Scan;
++import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(MediumTests.class)
 +public class TestDeleteMobTable {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static byte[] FAMILY = Bytes.toBytes("family");
 +  private final static byte[] QF = Bytes.toBytes("qualifier");
 +  private static Random random = new Random();
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.startMiniCluster(1);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  /**
 +   * Generate the mob value.
 +   *
 +   * @param size
 +   *          the size of the value
 +   * @return the mob value generated
 +   */
 +  private static byte[] generateMobValue(int size) {
 +    byte[] mobVal = new byte[size];
 +    random.nextBytes(mobVal);
 +    return mobVal;
 +  }
 +
 +  @Test
 +  public void testDeleteMobTable() throws Exception {
 +    byte[] tableName = Bytes.toBytes("testDeleteMobTable");
 +    TableName tn = TableName.valueOf(tableName);
 +    HTableDescriptor htd = new HTableDescriptor(tn);
 +    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(0);
 +    htd.addFamily(hcd);
 +    HBaseAdmin admin = null;
-     HTable table = null;
++    Table table = null;
 +    try {
-       admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
++      admin = TEST_UTIL.getHBaseAdmin();
 +      admin.createTable(htd);
-       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
++      table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()).getTable(tn);
 +      byte[] value = generateMobValue(10);
 +
 +      byte[] row = Bytes.toBytes("row");
 +      Put put = new Put(row);
-       put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
++      put.addColumn(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
 +      table.put(put);
 +
-       table.flushCommits();
-       admin.flush(tableName);
++      admin.flush(tn);
 +
 +      // the mob file exists
 +      Assert.assertEquals(1, countMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
 +      String fileName = assertHasOneMobRow(table, tn, hcd.getNameAsString());
 +      Assert.assertFalse(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
 +      Assert.assertTrue(mobTableDirExist(tn));
 +      table.close();
 +
 +      admin.disableTable(tn);
 +      admin.deleteTable(tn);
 +
 +      Assert.assertFalse(admin.tableExists(tn));
 +      Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertEquals(1, countArchiveMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertTrue(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
 +      Assert.assertFalse(mobTableDirExist(tn));
 +    } finally {
 +      if (admin != null) {
 +        admin.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testDeleteNonMobTable() throws Exception {
 +    byte[] tableName = Bytes.toBytes("testDeleteNonMobTable");
 +    TableName tn = TableName.valueOf(tableName);
 +    HTableDescriptor htd = new HTableDescriptor(tn);
 +    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
 +    htd.addFamily(hcd);
 +    HBaseAdmin admin = null;
-     HTable table = null;
++    Table table = null;
 +    try {
-       admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
++      admin = TEST_UTIL.getHBaseAdmin();
 +      admin.createTable(htd);
-       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
++      table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()).getTable(tn);
 +      byte[] value = generateMobValue(10);
 +
 +      byte[] row = Bytes.toBytes("row");
 +      Put put = new Put(row);
-       put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
++      put.addColumn(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
 +      table.put(put);
 +
-       table.flushCommits();
-       admin.flush(tableName);
++      admin.flush(tn);
 +      table.close();
 +
 +      // the mob file doesn't exist
 +      Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertFalse(mobTableDirExist(tn));
 +
 +      admin.disableTable(tn);
 +      admin.deleteTable(tn);
 +
 +      Assert.assertFalse(admin.tableExists(tn));
 +      Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertFalse(mobTableDirExist(tn));
 +    } finally {
 +      if (admin != null) {
 +        admin.close();
 +      }
 +    }
 +  }
 +
 +  private int countMobFiles(TableName tn, String familyName) throws IOException {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Path mobFileDir = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName);
 +    if (fs.exists(mobFileDir)) {
 +      return fs.listStatus(mobFileDir).length;
 +    } else {
 +      return 0;
 +    }
 +  }
 +
 +  private int countArchiveMobFiles(TableName tn, String familyName)
 +      throws IOException {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
 +        MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
 +    if (fs.exists(storePath)) {
 +      return fs.listStatus(storePath).length;
 +    } else {
 +      return 0;
 +    }
 +  }
 +
 +  private boolean mobTableDirExist(TableName tn) throws IOException {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Path tableDir = FSUtils.getTableDir(MobUtils.getMobHome(TEST_UTIL.getConfiguration()), tn);
 +    return fs.exists(tableDir);
 +  }
 +
 +  private boolean mobArchiveExist(TableName tn, String familyName, String fileName)
 +      throws IOException {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
 +        MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
 +    return fs.exists(new Path(storePath, fileName));
 +  }
 +
-   private String assertHasOneMobRow(HTable table, TableName tn, String familyName)
++  private String assertHasOneMobRow(Table table, TableName tn, String familyName)
 +      throws IOException {
 +    Scan scan = new Scan();
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner rs = table.getScanner(scan);
 +    Result r = rs.next();
 +    Assert.assertNotNull(r);
 +    byte[] value = r.getValue(FAMILY, QF);
 +    String fileName = Bytes.toString(value, Bytes.SIZEOF_INT, value.length - Bytes.SIZEOF_INT);
 +    Path filePath = new Path(
 +        MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName), fileName);
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Assert.assertTrue(fs.exists(filePath));
 +    r = rs.next();
 +    Assert.assertNull(r);
 +    return fileName;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index 3a0f9be,0000000..39fd410
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@@ -1,472 -1,0 +1,472 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.NavigableSet;
 +import java.util.concurrent.ConcurrentSkipListSet;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.HBaseConfiguration;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HRegionInfo;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.Tag;
 +import org.apache.hadoop.hbase.TagType;
 +import org.apache.hadoop.hbase.client.Get;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.wal.WAL;
 +import org.apache.hadoop.hbase.wal.WALFactory;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Rule;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +import org.junit.rules.TestName;
 +import org.mockito.Mockito;
 +
 +@Category(MediumTests.class)
 +public class TestHMobStore {
 +  public static final Log LOG = LogFactory.getLog(TestHMobStore.class);
 +  @Rule public TestName name = new TestName();
 +
 +  private HMobStore store;
 +  private HRegion region;
 +  private HColumnDescriptor hcd;
 +  private FileSystem fs;
 +  private byte [] table = Bytes.toBytes("table");
 +  private byte [] family = Bytes.toBytes("family");
 +  private byte [] row = Bytes.toBytes("row");
 +  private byte [] row2 = Bytes.toBytes("row2");
 +  private byte [] qf1 = Bytes.toBytes("qf1");
 +  private byte [] qf2 = Bytes.toBytes("qf2");
 +  private byte [] qf3 = Bytes.toBytes("qf3");
 +  private byte [] qf4 = Bytes.toBytes("qf4");
 +  private byte [] qf5 = Bytes.toBytes("qf5");
 +  private byte [] qf6 = Bytes.toBytes("qf6");
 +  private byte[] value = Bytes.toBytes("value");
 +  private byte[] value2 = Bytes.toBytes("value2");
 +  private Path mobFilePath;
 +  private Date currentDate = new Date();
 +  private KeyValue seekKey1;
 +  private KeyValue seekKey2;
 +  private KeyValue seekKey3;
 +  private NavigableSet<byte[]> qualifiers =
 +    new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
 +  private List<Cell> expected = new ArrayList<Cell>();
 +  private long id = System.currentTimeMillis();
 +  private Get get = new Get(row);
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final String DIR = TEST_UTIL.getDataTestDir("TestHMobStore").toString();
 +
 +  /**
 +   * Setup
 +   * @throws Exception
 +   */
 +  @Before
 +  public void setUp() throws Exception {
 +    qualifiers.add(qf1);
 +    qualifiers.add(qf3);
 +    qualifiers.add(qf5);
 +
 +    Iterator<byte[]> iter = qualifiers.iterator();
 +    while(iter.hasNext()){
 +      byte [] next = iter.next();
 +      expected.add(new KeyValue(row, family, next, 1, value));
 +      get.addColumn(family, next);
 +      get.setMaxVersions(); // all versions.
 +    }
 +  }
 +
 +  private void init(String methodName, Configuration conf, boolean testStore)
 +  throws IOException {
 +    hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    hcd.setMaxVersions(4);
 +    init(methodName, conf, hcd, testStore);
 +  }
 +
 +  private void init(String methodName, Configuration conf,
 +      HColumnDescriptor hcd, boolean testStore) throws IOException {
 +    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
 +    init(methodName, conf, htd, hcd, testStore);
 +  }
 +
 +  private void init(String methodName, Configuration conf, HTableDescriptor htd,
 +      HColumnDescriptor hcd, boolean testStore) throws IOException {
 +    //Setting up tje Region and Store
 +    Path basedir = new Path(DIR+methodName);
 +    Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
 +    String logName = "logs";
 +    Path logdir = new Path(basedir, logName);
 +    FileSystem fs = FileSystem.get(conf);
 +    fs.delete(logdir, true);
 +
 +    htd.addFamily(hcd);
 +    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
 +
 +    final Configuration walConf = new Configuration(conf);
 +    FSUtils.setRootDir(walConf, basedir);
 +    final WALFactory wals = new WALFactory(walConf, null, methodName);
 +    region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf,
 +            info, htd, null);
 +    store = new HMobStore(region, hcd, conf);
 +    if(testStore) {
 +      init(conf, hcd);
 +    }
 +  }
 +
 +  private void init(Configuration conf, HColumnDescriptor hcd)
 +      throws IOException {
 +    Path basedir = FSUtils.getRootDir(conf);
 +    fs = FileSystem.get(conf);
 +    Path homePath = new Path(basedir, Bytes.toString(family) + Path.SEPARATOR
 +        + Bytes.toString(family));
 +    fs.mkdirs(homePath);
 +
 +    KeyValue key1 = new KeyValue(row, family, qf1, 1, value);
 +    KeyValue key2 = new KeyValue(row, family, qf2, 1, value);
 +    KeyValue key3 = new KeyValue(row2, family, qf3, 1, value2);
 +    KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
 +    int maxKeyCount = keys.length;
 +    StoreFile.Writer mobWriter = store.createWriterInTmp(currentDate, maxKeyCount,
-         hcd.getCompactionCompression(), region.getStartKey());
++        hcd.getCompactionCompression(), region.getRegionInfo().getStartKey());
 +    mobFilePath = mobWriter.getPath();
 +
 +    mobWriter.append(key1);
 +    mobWriter.append(key2);
 +    mobWriter.append(key3);
 +    mobWriter.close();
 +
 +    String targetPathName = MobUtils.formatDate(currentDate);
 +    byte[] referenceValue = Bytes.toBytes(targetPathName + Path.SEPARATOR + mobFilePath.getName());
 +    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName().getName());
 +    KeyValue kv1 = new KeyValue(row, family, qf1, Long.MAX_VALUE, referenceValue);
 +    KeyValue kv2 = new KeyValue(row, family, qf2, Long.MAX_VALUE, referenceValue);
 +    KeyValue kv3 = new KeyValue(row2, family, qf3, Long.MAX_VALUE, referenceValue);
 +    seekKey1 = MobUtils.createMobRefKeyValue(kv1, referenceValue, tableNameTag);
 +    seekKey2 = MobUtils.createMobRefKeyValue(kv2, referenceValue, tableNameTag);
 +    seekKey3 = MobUtils.createMobRefKeyValue(kv3, referenceValue, tableNameTag);
 +  }
 +
 +  /**
 +   * Getting data from memstore
 +   * @throws IOException
 +   */
 +  @Test
 +  public void testGetFromMemStore() throws IOException {
 +    final Configuration conf = HBaseConfiguration.create();
 +    init(name.getMethodName(), conf, false);
 +
 +    //Put data in memstore
 +    this.store.add(new KeyValue(row, family, qf1, 1, value));
 +    this.store.add(new KeyValue(row, family, qf2, 1, value));
 +    this.store.add(new KeyValue(row, family, qf3, 1, value));
 +    this.store.add(new KeyValue(row, family, qf4, 1, value));
 +    this.store.add(new KeyValue(row, family, qf5, 1, value));
 +    this.store.add(new KeyValue(row, family, qf6, 1, value));
 +
 +    Scan scan = new Scan(get);
 +    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
 +        scan.getFamilyMap().get(store.getFamily().getName()),
 +        0);
 +
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    Collections.sort(results, KeyValue.COMPARATOR);
 +    scanner.close();
 +
 +    //Compare
 +    Assert.assertEquals(expected.size(), results.size());
 +    for(int i=0; i<results.size(); i++) {
 +      // Verify the values
 +      Assert.assertEquals(expected.get(i), results.get(i));
 +    }
 +  }
 +
 +  /**
 +   * Getting MOB data from files
 +   * @throws IOException
 +   */
 +  @Test
 +  public void testGetFromFiles() throws IOException {
 +    final Configuration conf = TEST_UTIL.getConfiguration();
 +    init(name.getMethodName(), conf, false);
 +
 +    //Put data in memstore
 +    this.store.add(new KeyValue(row, family, qf1, 1, value));
 +    this.store.add(new KeyValue(row, family, qf2, 1, value));
 +    //flush
 +    flush(1);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf3, 1, value));
 +    this.store.add(new KeyValue(row, family, qf4, 1, value));
 +    //flush
 +    flush(2);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf5, 1, value));
 +    this.store.add(new KeyValue(row, family, qf6, 1, value));
 +    //flush
 +    flush(3);
 +
 +    Scan scan = new Scan(get);
 +    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
 +        scan.getFamilyMap().get(store.getFamily().getName()),
 +        0);
 +
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    Collections.sort(results, KeyValue.COMPARATOR);
 +    scanner.close();
 +
 +    //Compare
 +    Assert.assertEquals(expected.size(), results.size());
 +    for(int i=0; i<results.size(); i++) {
 +      Assert.assertEquals(expected.get(i), results.get(i));
 +    }
 +  }
 +
 +  /**
 +   * Getting the reference data from files
 +   * @throws IOException
 +   */
 +  @Test
 +  public void testGetReferencesFromFiles() throws IOException {
 +    final Configuration conf = HBaseConfiguration.create();
 +    init(name.getMethodName(), conf, false);
 +
 +    //Put data in memstore
 +    this.store.add(new KeyValue(row, family, qf1, 1, value));
 +    this.store.add(new KeyValue(row, family, qf2, 1, value));
 +    //flush
 +    flush(1);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf3, 1, value));
 +    this.store.add(new KeyValue(row, family, qf4, 1, value));
 +    //flush
 +    flush(2);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf5, 1, value));
 +    this.store.add(new KeyValue(row, family, qf6, 1, value));
 +    //flush
 +    flush(3);
 +
 +    Scan scan = new Scan(get);
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
 +      scan.getFamilyMap().get(store.getFamily().getName()),
 +      0);
 +
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    Collections.sort(results, KeyValue.COMPARATOR);
 +    scanner.close();
 +
 +    //Compare
 +    Assert.assertEquals(expected.size(), results.size());
 +    for(int i=0; i<results.size(); i++) {
 +      Cell cell = results.get(i);
 +      Assert.assertTrue(MobUtils.isMobReferenceCell(cell));
 +    }
 +  }
 +
 +  /**
 +   * Getting data from memstore and files
 +   * @throws IOException
 +   */
 +  @Test
 +  public void testGetFromMemStoreAndFiles() throws IOException {
 +
 +    final Configuration conf = HBaseConfiguration.create();
 +
 +    init(name.getMethodName(), conf, false);
 +
 +    //Put data in memstore
 +    this.store.add(new KeyValue(row, family, qf1, 1, value));
 +    this.store.add(new KeyValue(row, family, qf2, 1, value));
 +    //flush
 +    flush(1);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf3, 1, value));
 +    this.store.add(new KeyValue(row, family, qf4, 1, value));
 +    //flush
 +    flush(2);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf5, 1, value));
 +    this.store.add(new KeyValue(row, family, qf6, 1, value));
 +
 +    Scan scan = new Scan(get);
 +    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
 +        scan.getFamilyMap().get(store.getFamily().getName()),
 +        0);
 +
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    Collections.sort(results, KeyValue.COMPARATOR);
 +    scanner.close();
 +
 +    //Compare
 +    Assert.assertEquals(expected.size(), results.size());
 +    for(int i=0; i<results.size(); i++) {
 +      Assert.assertEquals(expected.get(i), results.get(i));
 +    }
 +  }
 +
 +  /**
 +   * Getting data from memstore and files
 +   * @throws IOException
 +   */
 +  @Test
 +  public void testMobCellSizeThreshold() throws IOException {
 +
 +    final Configuration conf = HBaseConfiguration.create();
 +
 +    HColumnDescriptor hcd;
 +    hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(100);
 +    hcd.setMaxVersions(4);
 +    init(name.getMethodName(), conf, hcd, false);
 +
 +    //Put data in memstore
 +    this.store.add(new KeyValue(row, family, qf1, 1, value));
 +    this.store.add(new KeyValue(row, family, qf2, 1, value));
 +    //flush
 +    flush(1);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf3, 1, value));
 +    this.store.add(new KeyValue(row, family, qf4, 1, value));
 +    //flush
 +    flush(2);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf5, 1, value));
 +    this.store.add(new KeyValue(row, family, qf6, 1, value));
 +    //flush
 +    flush(3);
 +
 +    Scan scan = new Scan(get);
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
 +      scan.getFamilyMap().get(store.getFamily().getName()),
 +      0);
 +
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    Collections.sort(results, KeyValue.COMPARATOR);
 +    scanner.close();
 +
 +    //Compare
 +    Assert.assertEquals(expected.size(), results.size());
 +    for(int i=0; i<results.size(); i++) {
 +      Cell cell = results.get(i);
 +      //this is not mob reference cell.
 +      Assert.assertFalse(MobUtils.isMobReferenceCell(cell));
 +      Assert.assertEquals(expected.get(i), results.get(i));
 +      Assert.assertEquals(100, store.getFamily().getMobThreshold());
 +    }
 +  }
 +
 +  @Test
 +  public void testCommitFile() throws Exception {
 +    final Configuration conf = HBaseConfiguration.create();
 +    init(name.getMethodName(), conf, true);
 +    String targetPathName = MobUtils.formatDate(new Date());
 +    Path targetPath = new Path(store.getPath(), (targetPathName
 +        + Path.SEPARATOR + mobFilePath.getName()));
 +    fs.delete(targetPath, true);
 +    Assert.assertFalse(fs.exists(targetPath));
 +    //commit file
 +    store.commitFile(mobFilePath, targetPath);
 +    Assert.assertTrue(fs.exists(targetPath));
 +  }
 +
 +  @Test
 +  public void testResolve() throws Exception {
 +    final Configuration conf = HBaseConfiguration.create();
 +    init(name.getMethodName(), conf, true);
 +    String targetPathName = MobUtils.formatDate(currentDate);
 +    Path targetPath = new Path(store.getPath(), targetPathName);
 +    store.commitFile(mobFilePath, targetPath);
 +    //resolve
 +    Cell resultCell1 = store.resolve(seekKey1, false);
 +    Cell resultCell2 = store.resolve(seekKey2, false);
 +    Cell resultCell3 = store.resolve(seekKey3, false);
 +    //compare
 +    Assert.assertEquals(Bytes.toString(value),
 +        Bytes.toString(CellUtil.cloneValue(resultCell1)));
 +    Assert.assertEquals(Bytes.toString(value),
 +        Bytes.toString(CellUtil.cloneValue(resultCell2)));
 +    Assert.assertEquals(Bytes.toString(value2),
 +        Bytes.toString(CellUtil.cloneValue(resultCell3)));
 +  }
 +
 +  /**
 +   * Flush the memstore
 +   * @param storeFilesSize
 +   * @throws IOException
 +   */
 +  private void flush(int storeFilesSize) throws IOException{
 +    this.store.snapshot();
 +    flushStore(store, id++);
 +    Assert.assertEquals(storeFilesSize, this.store.getStorefiles().size());
 +    Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
 +  }
 +
 +  /**
 +   * Flush the memstore
 +   * @param store
 +   * @param id
 +   * @throws IOException
 +   */
 +  private static void flushStore(HMobStore store, long id) throws IOException {
 +    StoreFlushContext storeFlushCtx = store.createFlushContext(id);
 +    storeFlushCtx.prepare();
 +    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
 +    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
index d429de5,0000000..005bdfe
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
@@@ -1,467 -1,0 +1,466 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
 +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Random;
 +import java.util.Set;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.client.Delete;
 +import org.apache.hadoop.hbase.client.Durability;
 +import org.apache.hadoop.hbase.client.Put;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.io.hfile.HFile;
 +import org.apache.hadoop.hbase.io.hfile.HFileContext;
 +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.util.Pair;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Rule;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +import org.junit.rules.TestName;
 +
 +/**
 + * Test mob compaction
 + */
 +@Category(MediumTests.class)
 +public class TestMobCompaction {
 +  @Rule
 +  public TestName name = new TestName();
 +  static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName());
 +  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
 +  private Configuration conf = null;
 +
 +  private HRegion region = null;
 +  private HTableDescriptor htd = null;
 +  private HColumnDescriptor hcd = null;
 +  private long mobCellThreshold = 1000;
 +
 +  private FileSystem fs;
 +
 +  private static final byte[] COLUMN_FAMILY = fam1;
 +  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
 +  private int compactionThreshold;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    UTIL.startMiniCluster(1);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    UTIL.shutdownMiniCluster();
 +  }
 +
 +  private void init(Configuration conf, long mobThreshold) throws Exception {
 +    this.conf = conf;
 +    this.mobCellThreshold = mobThreshold;
 +    HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
 +
 +    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
 +    htd = UTIL.createTableDescriptor(name.getMethodName());
 +    hcd = new HColumnDescriptor(COLUMN_FAMILY);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(mobThreshold);
 +    hcd.setMaxVersions(1);
 +    htd.modifyFamily(hcd);
 +
 +    region = UTIL.createLocalHRegion(htd, null, null);
 +    fs = FileSystem.get(conf);
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    region.close();
 +    fs.delete(UTIL.getDataTestDir(), true);
 +  }
 +
 +  /**
 +   * During compaction, cells smaller than the threshold won't be affected.
 +   */
 +  @Test
 +  public void testSmallerValue() throws Exception {
 +    init(UTIL.getConfiguration(), 500);
 +    byte[] dummyData = makeDummyData(300); // smaller than mob threshold
 +    HRegionIncommon loader = new HRegionIncommon(region);
 +    // one hfile per row
 +    for (int i = 0; i < compactionThreshold; i++) {
 +      Put p = createPut(i, dummyData);
 +      loader.put(p);
 +      loader.flushcache();
 +    }
 +    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
 +    assertEquals("Before compaction: mob file count", 0, countMobFiles());
 +    assertEquals("Before compaction: rows", compactionThreshold, countRows());
 +    assertEquals("Before compaction: mob rows", 0, countMobRows());
 +
 +    region.compactStores();
 +
 +    assertEquals("After compaction: store files", 1, countStoreFiles());
 +    assertEquals("After compaction: mob file count", 0, countMobFiles());
 +    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
 +    assertEquals("After compaction: rows", compactionThreshold, countRows());
 +    assertEquals("After compaction: mob rows", 0, countMobRows());
 +  }
 +
 +  /**
 +   * During compaction, the mob threshold size is changed.
 +   */
 +  @Test
 +  public void testLargerValue() throws Exception {
 +    init(UTIL.getConfiguration(), 200);
 +    byte[] dummyData = makeDummyData(300); // larger than mob threshold
 +    HRegionIncommon loader = new HRegionIncommon(region);
 +    for (int i = 0; i < compactionThreshold; i++) {
 +      Put p = createPut(i, dummyData);
 +      loader.put(p);
 +      loader.flushcache();
 +    }
 +    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
 +    assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
 +    assertEquals("Before compaction: rows", compactionThreshold, countRows());
 +    assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
 +    assertEquals("Before compaction: number of mob cells", compactionThreshold,
 +        countMobCellsInMetadata());
 +    // Change the threshold larger than the data size
 +    region.getTableDesc().getFamily(COLUMN_FAMILY).setMobThreshold(500);
 +    region.initialize();
 +    region.compactStores();
 +
 +    assertEquals("After compaction: store files", 1, countStoreFiles());
 +    assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
 +    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
 +    assertEquals("After compaction: rows", compactionThreshold, countRows());
 +    assertEquals("After compaction: mob rows", 0, countMobRows());
 +  }
 +
 +  /**
 +   * This test will first generate store files, then bulk load them and trigger the compaction. When
 +   * compaction, the cell value will be larger than the threshold.
 +   */
 +  @Test
 +  public void testMobCompactionWithBulkload() throws Exception {
 +    // The following will produce store files of 600.
 +    init(UTIL.getConfiguration(), 300);
 +    byte[] dummyData = makeDummyData(600);
 +
 +    Path hbaseRootDir = FSUtils.getRootDir(conf);
 +    Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
-     List<Pair<byte[], String>> hfiles = new ArrayList<Pair<byte[], String>>(1);
++    List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
 +    for (int i = 0; i < compactionThreshold; i++) {
 +      Path hpath = new Path(basedir, "hfile" + i);
 +      hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
 +      createHFile(hpath, i, dummyData);
 +    }
 +
 +    // The following will bulk load the above generated store files and compact, with 600(fileSize)
 +    // > 300(threshold)
-     boolean result = region.bulkLoadHFiles(hfiles, true);
++    boolean result = region.bulkLoadHFiles(hfiles, true, null);
 +    assertTrue("Bulkload result:", result);
 +    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
 +    assertEquals("Before compaction: mob file count", 0, countMobFiles());
 +    assertEquals("Before compaction: rows", compactionThreshold, countRows());
 +    assertEquals("Before compaction: mob rows", 0, countMobRows());
 +    assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
 +
 +    region.compactStores();
 +
 +    assertEquals("After compaction: store files", 1, countStoreFiles());
 +    assertEquals("After compaction: mob file count:", 1, countMobFiles());
 +    assertEquals("After compaction: rows", compactionThreshold, countRows());
 +    assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
 +    assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
 +    assertEquals("After compaction: number of mob cells", compactionThreshold,
 +        countMobCellsInMetadata());
 +  }
 +
 +  @Test
 +  public void testMajorCompactionAfterDelete() throws Exception {
 +    init(UTIL.getConfiguration(), 100);
 +    byte[] dummyData = makeDummyData(200); // larger than mob threshold
 +    HRegionIncommon loader = new HRegionIncommon(region);
 +    // create hfiles and mob hfiles but don't trigger compaction
 +    int numHfiles = compactionThreshold - 1;
 +    byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0));
 +    for (int i = 0; i < numHfiles; i++) {
 +      Put p = createPut(i, dummyData);
 +      loader.put(p);
 +      loader.flushcache();
 +    }
 +    assertEquals("Before compaction: store files", numHfiles, countStoreFiles());
 +    assertEquals("Before compaction: mob file count", numHfiles, countMobFiles());
 +    assertEquals("Before compaction: rows", numHfiles, countRows());
 +    assertEquals("Before compaction: mob rows", numHfiles, countMobRows());
 +    assertEquals("Before compaction: number of mob cells", numHfiles, countMobCellsInMetadata());
 +    // now let's delete some cells that contain mobs
 +    Delete delete = new Delete(deleteRow);
-     delete.deleteFamily(COLUMN_FAMILY);
++    delete.addFamily(COLUMN_FAMILY);
 +    region.delete(delete);
 +    loader.flushcache();
 +
 +    assertEquals("Before compaction: store files", numHfiles + 1, countStoreFiles());
 +    assertEquals("Before compaction: mob files", numHfiles, countMobFiles());
-     region.compactStores(true);
++    // region.compactStores();
++    region.compact(true);
 +    assertEquals("After compaction: store files", 1, countStoreFiles());
 +    // still have original mob hfiles and now added a mob del file
 +    assertEquals("After compaction: mob files", numHfiles + 1, countMobFiles());
 +
 +    Scan scan = new Scan();
 +    scan.setRaw(true);
 +    InternalScanner scanner = region.getScanner(scan);
-     List<Cell> results = new ArrayList<Cell>();
++    List<Cell> results = new ArrayList<>();
 +    scanner.next(results);
 +    int deleteCount = 0;
 +    while (!results.isEmpty()) {
 +      for (Cell c : results) {
 +        if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
 +          deleteCount++;
 +          assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow));
 +        }
 +      }
 +      results.clear();
 +      scanner.next(results);
 +    }
 +    // assert the delete mark is not retained after the major compaction
 +    assertEquals(0, deleteCount);
 +    scanner.close();
 +    // assert the deleted cell is not counted
 +    assertEquals("The cells in mob files", numHfiles - 1, countMobCellsInMobFiles(1));
 +  }
 +
 +  private int countStoreFiles() throws IOException {
 +    Store store = region.getStore(COLUMN_FAMILY);
 +    return store.getStorefilesCount();
 +  }
 +
 +  private int countMobFiles() throws IOException {
 +    Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
 +        hcd.getNameAsString());
 +    if (fs.exists(mobDirPath)) {
 +      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
 +      return files.length;
 +    }
 +    return 0;
 +  }
 +
 +  private long countMobCellsInMetadata() throws IOException {
 +    long mobCellsCount = 0;
 +    Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
 +        hcd.getNameAsString());
 +    Configuration copyOfConf = new Configuration(conf);
 +    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
 +    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
 +    if (fs.exists(mobDirPath)) {
 +      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
 +      for (FileStatus file : files) {
 +        StoreFile sf = new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE);
 +        Map<byte[], byte[]> fileInfo = sf.createReader().loadFileInfo();
 +        byte[] count = fileInfo.get(StoreFile.MOB_CELLS_COUNT);
 +        assertTrue(count != null);
 +        mobCellsCount += Bytes.toLong(count);
 +      }
 +    }
 +    return mobCellsCount;
 +  }
 +
 +  private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
 +    Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
 +    p.setDurability(Durability.SKIP_WAL);
-     p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
++    p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
 +    return p;
 +  }
 +
 +  /**
 +   * Create an HFile with the given number of bytes
 +   */
 +  private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
 +    HFileContext meta = new HFileContextBuilder().build();
 +    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
 +        .withFileContext(meta).create();
 +    long now = System.currentTimeMillis();
 +    try {
 +      KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
 +          Bytes.toBytes("colX"), now, dummyData);
 +      writer.append(kv);
 +    } finally {
 +      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
 +      writer.close();
 +    }
 +  }
 +
 +  private int countMobRows() throws IOException {
 +    Scan scan = new Scan();
 +    // Do not retrieve the mob data when scanning
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    InternalScanner scanner = region.getScanner(scan);
 +
 +    int scannedCount = 0;
-     List<Cell> results = new ArrayList<Cell>();
++    List<Cell> results = new ArrayList<>();
 +    boolean hasMore = true;
 +    while (hasMore) {
 +      hasMore = scanner.next(results);
 +      for (Cell c : results) {
 +        if (MobUtils.isMobReferenceCell(c)) {
 +          scannedCount++;
 +        }
 +      }
 +      results.clear();
 +    }
 +    scanner.close();
 +
 +    return scannedCount;
 +  }
 +
 +  private int countRows() throws IOException {
 +    Scan scan = new Scan();
 +    // Do not retrieve the mob data when scanning
 +    InternalScanner scanner = region.getScanner(scan);
 +
 +    int scannedCount = 0;
 +    List<Cell> results = new ArrayList<Cell>();
 +    boolean hasMore = true;
 +    while (hasMore) {
 +      hasMore = scanner.next(results);
 +      scannedCount += results.size();
 +      results.clear();
 +    }
 +    scanner.close();
 +
 +    return scannedCount;
 +  }
 +
 +  private byte[] makeDummyData(int size) {
 +    byte[] dummyData = new byte[size];
 +    new Random().nextBytes(dummyData);
 +    return dummyData;
 +  }
 +
 +  private int countReferencedMobFiles() throws IOException {
 +    Scan scan = new Scan();
 +    // Do not retrieve the mob data when scanning
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    InternalScanner scanner = region.getScanner(scan);
 +
-     List<Cell> kvs = new ArrayList<Cell>();
++    List<Cell> kvs = new ArrayList<>();
 +    boolean hasMore = true;
 +    String fileName;
-     Set<String> files = new HashSet<String>();
++    Set<String> files = new HashSet<>();
 +    do {
 +      kvs.clear();
 +      hasMore = scanner.next(kvs);
-       for (Cell c : kvs) {
-         KeyValue kv = KeyValueUtil.ensureKeyValue(c);
++      for (Cell kv : kvs) {
 +        if (!MobUtils.isMobReferenceCell(kv)) {
 +          continue;
 +        }
 +        if (!MobUtils.hasValidMobRefCellValue(kv)) {
 +          continue;
 +        }
 +        int size = MobUtils.getMobValueLength(kv);
 +        if (size <= mobCellThreshold) {
 +          continue;
 +        }
 +        fileName = MobUtils.getMobFileName(kv);
 +        if (fileName.isEmpty()) {
 +          continue;
 +        }
 +        files.add(fileName);
 +        Path familyPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(),
 +            hcd.getNameAsString());
 +        assertTrue(fs.exists(new Path(familyPath, fileName)));
 +      }
 +    } while (hasMore);
 +
 +    scanner.close();
 +
 +    return files.size();
 +  }
 +
 +  private int countMobCellsInMobFiles(int expectedNumDelfiles) throws IOException {
 +    Configuration copyOfConf = new Configuration(conf);
 +    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
 +    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
 +    Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
 +        hcd.getNameAsString());
-     List<StoreFile> sfs = new ArrayList<StoreFile>();
++    List<StoreFile> sfs = new ArrayList<>();
 +    int numDelfiles = 0;
 +    int size = 0;
 +    if (fs.exists(mobDirPath)) {
 +      for (FileStatus f : fs.listStatus(mobDirPath)) {
 +        StoreFile sf = new StoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE);
 +        sfs.add(sf);
 +        if (StoreFileInfo.isDelFile(sf.getPath())) {
 +          numDelfiles++;
 +        }
 +      }
 +      List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, null,
 +          HConstants.LATEST_TIMESTAMP);
 +      Scan scan = new Scan();
 +      scan.setMaxVersions(hcd.getMaxVersions());
 +      long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
 +      long ttl = HStore.determineTTLFromFamily(hcd);
 +      ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
 +      StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_DROP_DELETES, null,
 +          scanners, 0L, HConstants.LATEST_TIMESTAMP);
 +      List<Cell> results = new ArrayList<>();
 +      boolean hasMore = true;
 +      while (hasMore) {
 +        hasMore = scanner.next(results);
 +        size += results.size();
 +        results.clear();
 +      }
 +    }
 +    // assert the number of the existing del files
 +    assertEquals(expectedNumDelfiles, numDelfiles);
 +    return size;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
index 27a0b06,0000000..3b5a474
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
@@@ -1,410 -1,0 +1,411 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import java.io.IOException;
 +import java.util.List;
 +import java.util.Random;
 +
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HRegionInfo;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(MediumTests.class)
 +public class TestMobStoreScanner {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static byte [] row1 = Bytes.toBytes("row1");
 +  private final static byte [] family = Bytes.toBytes("family");
 +  private final static byte [] qf1 = Bytes.toBytes("qualifier1");
 +  private final static byte [] qf2 = Bytes.toBytes("qualifier2");
 +  protected final byte[] qf3 = Bytes.toBytes("qualifier3");
-   private static HTable table;
++  private static Table table;
 +  private static HBaseAdmin admin;
 +  private static HColumnDescriptor hcd;
 +  private static HTableDescriptor desc;
 +  private static Random random = new Random();
 +  private static long defaultThreshold = 10;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100*1024*1024);
 +
 +    TEST_UTIL.startMiniCluster(1);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  public void setUp(long threshold, String TN) throws Exception {
 +    desc = new HTableDescriptor(TableName.valueOf(TN));
 +    hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(threshold);
 +    hcd.setMaxVersions(4);
 +    desc.addFamily(hcd);
-     admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
++    admin = TEST_UTIL.getHBaseAdmin();
 +    admin.createTable(desc);
-     table = new HTable(TEST_UTIL.getConfiguration(), TN);
++    table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
++            .getTable(TableName.valueOf(TN));
 +  }
 +
 +  /**
 +   * Generate the mob value.
 +   *
 +   * @param size the size of the value
 +   * @return the mob value generated
 +   */
 +  private static byte[] generateMobValue(int size) {
 +    byte[] mobVal = new byte[size];
 +    random.nextBytes(mobVal);
 +    return mobVal;
 +  }
 +
 +  /**
 +   * Set the scan attribute
 +   *
 +   * @param reversed if true, scan will be backward order
 +   * @param mobScanRaw if true, scan will get the mob reference
 +   * @return this
 +   */
 +  public void setScan(Scan scan, boolean reversed, boolean mobScanRaw) {
 +    scan.setReversed(reversed);
 +    scan.setMaxVersions(4);
 +    if(mobScanRaw) {
 +      scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    }
 +  }
 +
 +  @Test
 +  public void testMobStoreScanner() throws Exception {
 +	  testGetFromFiles(false);
 +	  testGetFromMemStore(false);
 +    testGetReferences(false);
 +    testMobThreshold(false);
 +    testGetFromArchive(false);
 +  }
 +
 +  @Test
 +  public void testReversedMobStoreScanner() throws Exception {
 +	  testGetFromFiles(true);
 +	  testGetFromMemStore(true);
 +    testGetReferences(true);
 +    testMobThreshold(true);
 +    testGetFromArchive(true);
 +  }
 +
 +  @Test(timeout=60000)
 +  public void testGetMassive() throws Exception {
 +    String TN = "testGetMassive";
 +    setUp(defaultThreshold, TN);
 +
 +    // Put some data 5 10, 15, 20  mb ok  (this would be right below protobuf default max size of 64MB.
 +    // 25, 30, 40 fail.  these is above protobuf max size of 64MB
 +    byte[] bigValue = new byte[25*1024*1024];
 +
 +    Put put = new Put(row1);
-     put.add(family, qf1, bigValue);
-     put.add(family, qf2, bigValue);
-     put.add(family, qf3, bigValue);
++    put.addColumn(family, qf1, bigValue);
++    put.addColumn(family, qf2, bigValue);
++    put.addColumn(family, qf3, bigValue);
 +    table.put(put);
 +
 +    Get g = new Get(row1);
 +    Result r = table.get(g);
 +    // should not have blown up.
 +  }
 +
 +  public void testGetFromFiles(boolean reversed) throws Exception {
 +    String TN = "testGetFromFiles" + reversed;
++    TableName tn = TableName.valueOf(TN);
 +    setUp(defaultThreshold, TN);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +    byte [] value = generateMobValue((int)defaultThreshold+1);
 +
 +    Put put1 = new Put(row1);
-     put1.add(family, qf1, ts3, value);
-     put1.add(family, qf2, ts2, value);
-     put1.add(family, qf3, ts1, value);
++    put1.addColumn(family, qf1, ts3, value);
++    put1.addColumn(family, qf2, ts2, value);
++    put1.addColumn(family, qf3, ts1, value);
 +    table.put(put1);
 +
-     table.flushCommits();
-     admin.flush(TN);
++    admin.flush(tn);
 +
 +    Scan scan = new Scan();
 +    setScan(scan, reversed, false);
 +
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        Assert.assertEquals(Bytes.toString(value),
 +            Bytes.toString(CellUtil.cloneValue(cell)));
 +        count++;
 +      }
 +    }
 +    results.close();
 +    Assert.assertEquals(3, count);
 +  }
 +
 +  public void testGetFromMemStore(boolean reversed) throws Exception {
 +    String TN = "testGetFromMemStore" + reversed;
 +    setUp(defaultThreshold, TN);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +    byte [] value = generateMobValue((int)defaultThreshold+1);;
 +
 +    Put put1 = new Put(row1);
-     put1.add(family, qf1, ts3, value);
-     put1.add(family, qf2, ts2, value);
-     put1.add(family, qf3, ts1, value);
++    put1.addColumn(family, qf1, ts3, value);
++    put1.addColumn(family, qf2, ts2, value);
++    put1.addColumn(family, qf3, ts1, value);
 +    table.put(put1);
 +
 +    Scan scan = new Scan();
 +    setScan(scan, reversed, false);
 +
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        Assert.assertEquals(Bytes.toString(value),
 +            Bytes.toString(CellUtil.cloneValue(cell)));
 +        count++;
 +      }
 +    }
 +    results.close();
 +    Assert.assertEquals(3, count);
 +  }
 +
 +  public void testGetReferences(boolean reversed) throws Exception {
 +    String TN = "testGetReferences" + reversed;
++    TableName tn = TableName.valueOf(TN);
 +    setUp(defaultThreshold, TN);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +    byte [] value = generateMobValue((int)defaultThreshold+1);;
 +
 +    Put put1 = new Put(row1);
-     put1.add(family, qf1, ts3, value);
-     put1.add(family, qf2, ts2, value);
-     put1.add(family, qf3, ts1, value);
++    put1.addColumn(family, qf1, ts3, value);
++    put1.addColumn(family, qf2, ts2, value);
++    put1.addColumn(family, qf3, ts1, value);
 +    table.put(put1);
 +
-     table.flushCommits();
-     admin.flush(TN);
++    admin.flush(tn);
 +
 +    Scan scan = new Scan();
 +    setScan(scan, reversed, true);
 +
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        assertIsMobReference(cell, row1, family, value, TN);
 +        count++;
 +      }
 +    }
 +    results.close();
 +    Assert.assertEquals(3, count);
 +  }
 +
 +  public void testMobThreshold(boolean reversed) throws Exception {
 +    String TN = "testMobThreshold" + reversed;
++    TableName tn = TableName.valueOf(TN);
 +    setUp(defaultThreshold, TN);
 +    byte [] valueLess = generateMobValue((int)defaultThreshold-1);
 +    byte [] valueEqual = generateMobValue((int)defaultThreshold);
 +    byte [] valueGreater = generateMobValue((int)defaultThreshold+1);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +
 +    Put put1 = new Put(row1);
-     put1.add(family, qf1, ts3, valueLess);
-     put1.add(family, qf2, ts2, valueEqual);
-     put1.add(family, qf3, ts1, valueGreater);
++    put1.addColumn(family, qf1, ts3, valueLess);
++    put1.addColumn(family, qf2, ts2, valueEqual);
++    put1.addColumn(family, qf3, ts1, valueGreater);
 +    table.put(put1);
 +
-     table.flushCommits();
-     admin.flush(TN);
++    admin.flush(tn);
 +
 +    Scan scan = new Scan();
 +    setScan(scan, reversed, true);
 +
 +    Cell cellLess= null;
 +    Cell cellEqual = null;
 +    Cell cellGreater = null;
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        String qf = Bytes.toString(CellUtil.cloneQualifier(cell));
 +        if(qf.equals(Bytes.toString(qf1))) {
 +          cellLess = cell;
 +        }
 +        if(qf.equals(Bytes.toString(qf2))) {
 +          cellEqual = cell;
 +        }
 +        if(qf.equals(Bytes.toString(qf3))) {
 +          cellGreater = cell;
 +        }
 +        count++;
 +      }
 +    }
 +    Assert.assertEquals(3, count);
 +    assertNotMobReference(cellLess, row1, family, valueLess);
 +    assertNotMobReference(cellEqual, row1, family, valueEqual);
 +    assertIsMobReference(cellGreater, row1, family, valueGreater, TN);
 +    results.close();
 +  }
 +
 +  public void testGetFromArchive(boolean reversed) throws Exception {
 +    String TN = "testGetFromArchive" + reversed;
++    TableName tn = TableName.valueOf(TN);
 +    setUp(defaultThreshold, TN);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +    byte [] value = generateMobValue((int)defaultThreshold+1);;
 +    // Put some data
 +    Put put1 = new Put(row1);
-     put1.add(family, qf1, ts3, value);
-     put1.add(family, qf2, ts2, value);
-     put1.add(family, qf3, ts1, value);
++    put1.addColumn(family, qf1, ts3, value);
++    put1.addColumn(family, qf2, ts2, value);
++    put1.addColumn(family, qf3, ts1, value);
 +    table.put(put1);
 +
-     table.flushCommits();
-     admin.flush(TN);
++    admin.flush(tn);
 +
 +    // Get the files in the mob path
 +    Path mobFamilyPath;
 +    mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
 +        TableName.valueOf(TN)), hcd.getNameAsString());
 +    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
 +    FileStatus[] files = fs.listStatus(mobFamilyPath);
 +
 +    // Get the archive path
 +    Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
 +    Path tableDir = FSUtils.getTableDir(rootDir, TableName.valueOf(TN));
 +    HRegionInfo regionInfo = MobUtils.getMobRegionInfo(TableName.valueOf(TN));
 +    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(),
 +        regionInfo, tableDir, family);
 +
 +    // Move the files from mob path to archive path
 +    fs.mkdirs(storeArchiveDir);
 +    int fileCount = 0;
 +    for(FileStatus file : files) {
 +      fileCount++;
 +      Path filePath = file.getPath();
 +      Path src = new Path(mobFamilyPath, filePath.getName());
 +      Path dst = new Path(storeArchiveDir, filePath.getName());
 +      fs.rename(src, dst);
 +    }
 +
 +    // Verify the moving success
 +    FileStatus[] files1 = fs.listStatus(mobFamilyPath);
 +    Assert.assertEquals(0, files1.length);
 +    FileStatus[] files2 = fs.listStatus(storeArchiveDir);
 +    Assert.assertEquals(fileCount, files2.length);
 +
 +    // Scan from archive
 +    Scan scan = new Scan();
 +    setScan(scan, reversed, false);
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        Assert.assertEquals(Bytes.toString(value),
 +            Bytes.toString(CellUtil.cloneValue(cell)));
 +        count++;
 +      }
 +    }
 +    results.close();
 +    Assert.assertEquals(3, count);
 +  }
 +
 +  /**
 +   * Assert the value is not store in mob.
 +   */
 +  private static void assertNotMobReference(Cell cell, byte[] row, byte[] family,
 +      byte[] value) throws IOException {
 +    Assert.assertEquals(Bytes.toString(row),
 +        Bytes.toString(CellUtil.cloneRow(cell)));
 +    Assert.assertEquals(Bytes.toString(family),
 +        Bytes.toString(CellUtil.cloneFamily(cell)));
 +    Assert.assertTrue(Bytes.toString(value).equals(
 +        Bytes.toString(CellUtil.cloneValue(cell))));
 +  }
 +
 +  /**
 +   * Assert the value is store in mob.
 +   */
 +  private static void assertIsMobReference(Cell cell, byte[] row, byte[] family,
 +      byte[] value, String TN) throws IOException {
 +    Assert.assertEquals(Bytes.toString(row),
 +        Bytes.toString(CellUtil.cloneRow(cell)));
 +    Assert.assertEquals(Bytes.toString(family),
 +        Bytes.toString(CellUtil.cloneFamily(cell)));
 +    Assert.assertFalse(Bytes.toString(value).equals(
 +        Bytes.toString(CellUtil.cloneValue(cell))));
 +    byte[] referenceValue = CellUtil.cloneValue(cell);
 +    String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
 +        referenceValue.length - Bytes.SIZEOF_INT);
 +    int valLen = Bytes.toInt(referenceValue, 0, Bytes.SIZEOF_INT);
 +    Assert.assertEquals(value.length, valLen);
 +    Path mobFamilyPath;
 +    mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
 +        TableName.valueOf(TN)), hcd.getNameAsString());
 +    Path targetPath = new Path(mobFamilyPath, fileName);
 +    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
 +    Assert.assertTrue(fs.exists(targetPath));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
index 5714351,5498d66..fa634d1
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
@@@ -360,70 -413,13 +413,77 @@@ public class TestRegionServerMetrics 
              "_table_"+tableNameString +
              "_region_" + i.getEncodedName()+
              "_metric";
-         metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg);
+         metricsHelper.assertCounter(prefix + "_scanNextNumOps", NUM_SCAN_NEXT, agg);
        }
+       metricsHelper.assertCounter("ScanNext_num_ops", numScanNext, serverSource);
+     }
+     try (Admin admin = TEST_UTIL.getHBaseAdmin()) {
+       admin.disableTable(tableName);
+       admin.deleteTable(tableName);
      }
    }
 +
 +  @Test
 +  public void testMobMetrics() throws IOException, InterruptedException {
 +    String tableNameString = "testMobMetrics";
 +    TableName tableName = TableName.valueOf(tableNameString);
 +    byte[] cf = Bytes.toBytes("d");
 +    byte[] qualifier = Bytes.toBytes("qual");
 +    byte[] val = Bytes.toBytes("mobdata");
 +    int numHfiles = conf.getInt("hbase.hstore.compactionThreshold", 3) - 1;
 +    HTableDescriptor htd = new HTableDescriptor(tableName);
 +    HColumnDescriptor hcd = new HColumnDescriptor(cf);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(0);
 +    htd.addFamily(hcd);
 +    HBaseAdmin admin = new HBaseAdmin(conf);
 +    HTable t = TEST_UTIL.createTable(htd, new byte[0][0], conf);
-     HRegion region = rs.getOnlineRegions(tableName).get(0);
++    Region region = rs.getOnlineRegions(tableName).get(0);
 +    t.setAutoFlush(true, true);
 +    for (int insertCount = 0; insertCount < numHfiles; insertCount++) {
 +      Put p = new Put(Bytes.toBytes(insertCount));
 +      p.add(cf, qualifier, val);
 +      t.put(p);
 +      admin.flush(tableName);
 +    }
 +    metricsRegionServer.getRegionServerWrapper().forceRecompute();
 +    metricsHelper.assertCounter("mobFlushCount", numHfiles, serverSource);
 +    Scan scan = new Scan(Bytes.toBytes(0), Bytes.toBytes(2));
 +    ResultScanner scanner = t.getScanner(scan);
 +    scanner.next(100);
++    numScanNext++;  // this is an ugly construct
 +    scanner.close();
 +    metricsRegionServer.getRegionServerWrapper().forceRecompute();
 +    metricsHelper.assertCounter("mobScanCellsCount", 2, serverSource);
 +    region.getTableDesc().getFamily(cf).setMobThreshold(100);
-     region.initialize();
-     region.compactStores(true);
++    ((HRegion)region).initialize();
++    region.compact(true);
 +    metricsRegionServer.getRegionServerWrapper().forceRecompute();
 +    metricsHelper.assertCounter("mobCompactedFromMobCellsCount", numHfiles,
 +        serverSource);
 +    metricsHelper.assertCounter("mobCompactedIntoMobCellsCount", 0, serverSource);
 +    scanner = t.getScanner(scan);
 +    scanner.next(100);
++    numScanNext++;  // this is an ugly construct
 +    metricsRegionServer.getRegionServerWrapper().forceRecompute();
 +    // metrics are reset by the region initialization
 +    metricsHelper.assertCounter("mobScanCellsCount", 0, serverSource);
 +    for (int insertCount = numHfiles;
 +        insertCount < 2 * numHfiles - 1; insertCount++) {
 +      Put p = new Put(Bytes.toBytes(insertCount));
 +      p.add(cf, qualifier, val);
 +      t.put(p);
 +      admin.flush(tableName);
 +    }
 +    region.getTableDesc().getFamily(cf).setMobThreshold(0);
-     region.initialize();
-     region.compactStores(true);
++    ((HRegion)region).initialize();
++    region.compact(true);
 +    metricsRegionServer.getRegionServerWrapper().forceRecompute();
 +    // metrics are reset by the region initialization
 +    metricsHelper.assertCounter("mobCompactedFromMobCellsCount", 0, serverSource);
 +    metricsHelper.assertCounter("mobCompactedIntoMobCellsCount", 2 * numHfiles - 1,
 +        serverSource);
 +    t.close();
 +    admin.close();
 +  }
  }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileInfo.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileInfo.java
index da39f59,1125d11..349ec1c
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileInfo.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileInfo.java
@@@ -59,5 -63,25 +63,25 @@@ public class TestStoreFileInfo 
        assertFalse("should not be a valid link: " + name, HFileLink.isHFileLink(name));
      }
    }
+ 
+   @Test
+   public void testEqualsWithLink() throws IOException {
+     Path origin = new Path("/origin");
+     Path tmp = new Path("/tmp");
++    Path mob = new Path("/mob");
+     Path archive = new Path("/archive");
+     HFileLink link1 = new HFileLink(new Path(origin, "f1"), new Path(tmp, "f1"),
 -      new Path(archive, "f1"));
++      new Path(mob, "f1"), new Path(archive, "f1"));
+     HFileLink link2 = new HFileLink(new Path(origin, "f1"), new Path(tmp, "f1"),
 -      new Path(archive, "f1"));
 -
++      new Path(mob, "f1"), new Path(archive, "f1"));
+ 
+     StoreFileInfo info1 = new StoreFileInfo(TEST_UTIL.getConfiguration(),
+       TEST_UTIL.getTestFileSystem(), null, link1);
+     StoreFileInfo info2 = new StoreFileInfo(TEST_UTIL.getConfiguration(),
+       TEST_UTIL.getTestFileSystem(), null, link2);
+ 
+     assertEquals(info1, info2);
+     assertEquals(info1.hashCode(), info2.hashCode());
+   }
  }