You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/02/22 21:56:17 UTC

[44/50] [abbrv] hbase git commit: Merge branch 'master' (2/11/15) into hbase-11339

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
index a64b7d6,0000000..028e602
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
@@@ -1,225 -1,0 +1,225 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import java.io.IOException;
 +import java.util.Random;
 +
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.MediumTests;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.client.HBaseAdmin;
 +import org.apache.hadoop.hbase.client.HTable;
 +import org.apache.hadoop.hbase.client.Put;
 +import org.apache.hadoop.hbase.client.Result;
 +import org.apache.hadoop.hbase.client.ResultScanner;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
++import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(MediumTests.class)
 +public class TestDeleteMobTable {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static byte[] FAMILY = Bytes.toBytes("family");
 +  private final static byte[] QF = Bytes.toBytes("qualifier");
 +  private static Random random = new Random();
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.startMiniCluster(1);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  /**
 +   * Generate the mob value.
 +   *
 +   * @param size
 +   *          the size of the value
 +   * @return the mob value generated
 +   */
 +  private static byte[] generateMobValue(int size) {
 +    byte[] mobVal = new byte[size];
 +    random.nextBytes(mobVal);
 +    return mobVal;
 +  }
 +
 +  @Test
 +  public void testDeleteMobTable() throws Exception {
 +    byte[] tableName = Bytes.toBytes("testDeleteMobTable");
 +    TableName tn = TableName.valueOf(tableName);
 +    HTableDescriptor htd = new HTableDescriptor(tn);
 +    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(0);
 +    htd.addFamily(hcd);
 +    HBaseAdmin admin = null;
 +    HTable table = null;
 +    try {
 +      admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
 +      admin.createTable(htd);
 +      table = new HTable(TEST_UTIL.getConfiguration(), tableName);
 +      byte[] value = generateMobValue(10);
 +
 +      byte[] row = Bytes.toBytes("row");
 +      Put put = new Put(row);
 +      put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
 +      table.put(put);
 +
 +      table.flushCommits();
 +      admin.flush(tableName);
 +
 +      // the mob file exists
 +      Assert.assertEquals(1, countMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
 +      String fileName = assertHasOneMobRow(table, tn, hcd.getNameAsString());
 +      Assert.assertFalse(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
 +      Assert.assertTrue(mobTableDirExist(tn));
 +      table.close();
 +
 +      admin.disableTable(tn);
 +      admin.deleteTable(tn);
 +
 +      Assert.assertFalse(admin.tableExists(tn));
 +      Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertEquals(1, countArchiveMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertTrue(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
 +      Assert.assertFalse(mobTableDirExist(tn));
 +    } finally {
 +      if (admin != null) {
 +        admin.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testDeleteNonMobTable() throws Exception {
 +    byte[] tableName = Bytes.toBytes("testDeleteNonMobTable");
 +    TableName tn = TableName.valueOf(tableName);
 +    HTableDescriptor htd = new HTableDescriptor(tn);
 +    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
 +    htd.addFamily(hcd);
 +    HBaseAdmin admin = null;
 +    HTable table = null;
 +    try {
 +      admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
 +      admin.createTable(htd);
 +      table = new HTable(TEST_UTIL.getConfiguration(), tableName);
 +      byte[] value = generateMobValue(10);
 +
 +      byte[] row = Bytes.toBytes("row");
 +      Put put = new Put(row);
 +      put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
 +      table.put(put);
 +
 +      table.flushCommits();
 +      admin.flush(tableName);
 +      table.close();
 +
 +      // the mob file doesn't exist
 +      Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertFalse(mobTableDirExist(tn));
 +
 +      admin.disableTable(tn);
 +      admin.deleteTable(tn);
 +
 +      Assert.assertFalse(admin.tableExists(tn));
 +      Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
 +      Assert.assertFalse(mobTableDirExist(tn));
 +    } finally {
 +      if (admin != null) {
 +        admin.close();
 +      }
 +    }
 +  }
 +
 +  private int countMobFiles(TableName tn, String familyName) throws IOException {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Path mobFileDir = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName);
 +    if (fs.exists(mobFileDir)) {
 +      return fs.listStatus(mobFileDir).length;
 +    } else {
 +      return 0;
 +    }
 +  }
 +
 +  private int countArchiveMobFiles(TableName tn, String familyName)
 +      throws IOException {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
 +        MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
 +    if (fs.exists(storePath)) {
 +      return fs.listStatus(storePath).length;
 +    } else {
 +      return 0;
 +    }
 +  }
 +
 +  private boolean mobTableDirExist(TableName tn) throws IOException {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Path tableDir = FSUtils.getTableDir(MobUtils.getMobHome(TEST_UTIL.getConfiguration()), tn);
 +    return fs.exists(tableDir);
 +  }
 +
 +  private boolean mobArchiveExist(TableName tn, String familyName, String fileName)
 +      throws IOException {
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
 +        MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
 +    return fs.exists(new Path(storePath, fileName));
 +  }
 +
 +  private String assertHasOneMobRow(HTable table, TableName tn, String familyName)
 +      throws IOException {
 +    Scan scan = new Scan();
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    ResultScanner rs = table.getScanner(scan);
 +    Result r = rs.next();
 +    Assert.assertNotNull(r);
 +    byte[] value = r.getValue(FAMILY, QF);
 +    String fileName = Bytes.toString(value, Bytes.SIZEOF_INT, value.length - Bytes.SIZEOF_INT);
 +    Path filePath = new Path(
 +        MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName), fileName);
 +    FileSystem fs = TEST_UTIL.getTestFileSystem();
 +    Assert.assertTrue(fs.exists(filePath));
 +    r = rs.next();
 +    Assert.assertNull(r);
 +    return fileName;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index 30b74d4,0000000..5f76655
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@@ -1,471 -1,0 +1,475 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.NavigableSet;
 +import java.util.concurrent.ConcurrentSkipListSet;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.HBaseConfiguration;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HRegionInfo;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.MediumTests;
++import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.client.Get;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.monitoring.MonitoredTask;
- import org.apache.hadoop.hbase.regionserver.wal.HLog;
- import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
++import org.apache.hadoop.hbase.wal.WAL;
++import org.apache.hadoop.hbase.wal.WALFactory;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Rule;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +import org.junit.rules.TestName;
 +import org.mockito.Mockito;
 +
 +@Category(MediumTests.class)
 +public class TestHMobStore {
 +  public static final Log LOG = LogFactory.getLog(TestHMobStore.class);
 +  @Rule public TestName name = new TestName();
 +
 +  private HMobStore store;
 +  private HRegion region;
 +  private HColumnDescriptor hcd;
 +  private FileSystem fs;
 +  private byte [] table = Bytes.toBytes("table");
 +  private byte [] family = Bytes.toBytes("family");
 +  private byte [] row = Bytes.toBytes("row");
 +  private byte [] row2 = Bytes.toBytes("row2");
 +  private byte [] qf1 = Bytes.toBytes("qf1");
 +  private byte [] qf2 = Bytes.toBytes("qf2");
 +  private byte [] qf3 = Bytes.toBytes("qf3");
 +  private byte [] qf4 = Bytes.toBytes("qf4");
 +  private byte [] qf5 = Bytes.toBytes("qf5");
 +  private byte [] qf6 = Bytes.toBytes("qf6");
 +  private byte[] value = Bytes.toBytes("value");
 +  private byte[] value2 = Bytes.toBytes("value2");
 +  private Path mobFilePath;
 +  private Date currentDate = new Date();
 +  private KeyValue seekKey1;
 +  private KeyValue seekKey2;
 +  private KeyValue seekKey3;
 +  private NavigableSet<byte[]> qualifiers =
 +    new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
 +  private List<Cell> expected = new ArrayList<Cell>();
 +  private long id = System.currentTimeMillis();
 +  private Get get = new Get(row);
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final String DIR = TEST_UTIL.getDataTestDir("TestHMobStore").toString();
 +
 +  /**
 +   * Setup
 +   * @throws Exception
 +   */
 +  @Before
 +  public void setUp() throws Exception {
 +    qualifiers.add(qf1);
 +    qualifiers.add(qf3);
 +    qualifiers.add(qf5);
 +
 +    Iterator<byte[]> iter = qualifiers.iterator();
 +    while(iter.hasNext()){
 +      byte [] next = iter.next();
 +      expected.add(new KeyValue(row, family, next, 1, value));
 +      get.addColumn(family, next);
 +      get.setMaxVersions(); // all versions.
 +    }
 +  }
 +
 +  private void init(String methodName, Configuration conf, boolean testStore)
 +  throws IOException {
 +    hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    hcd.setMaxVersions(4);
 +    init(methodName, conf, hcd, testStore);
 +  }
 +
 +  private void init(String methodName, Configuration conf,
 +      HColumnDescriptor hcd, boolean testStore) throws IOException {
 +    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
 +    init(methodName, conf, htd, hcd, testStore);
 +  }
 +
 +  private void init(String methodName, Configuration conf, HTableDescriptor htd,
 +      HColumnDescriptor hcd, boolean testStore) throws IOException {
 +    //Setting up tje Region and Store
 +    Path basedir = new Path(DIR+methodName);
 +    Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
 +    String logName = "logs";
 +    Path logdir = new Path(basedir, logName);
 +    FileSystem fs = FileSystem.get(conf);
 +    fs.delete(logdir, true);
 +
 +    htd.addFamily(hcd);
 +    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
-     HLog hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
-     region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
++
++    final Configuration walConf = new Configuration(conf);
++    FSUtils.setRootDir(walConf, basedir);
++    final WALFactory wals = new WALFactory(walConf, null, methodName);
++    region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf,
++            info, htd, null);
 +    store = new HMobStore(region, hcd, conf);
 +    if(testStore) {
 +      init(conf, hcd);
 +    }
 +  }
 +
 +  private void init(Configuration conf, HColumnDescriptor hcd)
 +      throws IOException {
 +    Path basedir = FSUtils.getRootDir(conf);
 +    fs = FileSystem.get(conf);
 +    Path homePath = new Path(basedir, Bytes.toString(family) + Path.SEPARATOR
 +        + Bytes.toString(family));
 +    fs.mkdirs(homePath);
 +
 +    KeyValue key1 = new KeyValue(row, family, qf1, 1, value);
 +    KeyValue key2 = new KeyValue(row, family, qf2, 1, value);
 +    KeyValue key3 = new KeyValue(row2, family, qf3, 1, value2);
 +    KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
 +    int maxKeyCount = keys.length;
 +    StoreFile.Writer mobWriter = store.createWriterInTmp(currentDate,
 +        maxKeyCount, hcd.getCompactionCompression(), region.getStartKey());
 +    mobFilePath = mobWriter.getPath();
 +
 +    mobWriter.append(key1);
 +    mobWriter.append(key2);
 +    mobWriter.append(key3);
 +    mobWriter.close();
 +
 +    int valueLength1 = key1.getValueLength();
 +    int valueLength2 = key2.getValueLength();
 +    int valueLength3 = key3.getValueLength();
 +
 +    String targetPathName = MobUtils.formatDate(currentDate);
 +    byte[] referenceValue =
 +            Bytes.toBytes(targetPathName + Path.SEPARATOR
 +                + mobFilePath.getName());
 +    byte[] newReferenceValue1 = Bytes.add(Bytes.toBytes(valueLength1), referenceValue);
 +    byte[] newReferenceValue2 = Bytes.add(Bytes.toBytes(valueLength2), referenceValue);
 +    byte[] newReferenceValue3 = Bytes.add(Bytes.toBytes(valueLength3), referenceValue);
 +    seekKey1 = new KeyValue(row, family, qf1, Long.MAX_VALUE, newReferenceValue1);
 +    seekKey2 = new KeyValue(row, family, qf2, Long.MAX_VALUE, newReferenceValue2);
 +    seekKey3 = new KeyValue(row2, family, qf3, Long.MAX_VALUE, newReferenceValue3);
 +  }
 +
 +  /**
 +   * Getting data from memstore
 +   * @throws IOException
 +   */
 +  @Test
 +  public void testGetFromMemStore() throws IOException {
 +    final Configuration conf = HBaseConfiguration.create();
 +    init(name.getMethodName(), conf, false);
 +
 +    //Put data in memstore
 +    this.store.add(new KeyValue(row, family, qf1, 1, value));
 +    this.store.add(new KeyValue(row, family, qf2, 1, value));
 +    this.store.add(new KeyValue(row, family, qf3, 1, value));
 +    this.store.add(new KeyValue(row, family, qf4, 1, value));
 +    this.store.add(new KeyValue(row, family, qf5, 1, value));
 +    this.store.add(new KeyValue(row, family, qf6, 1, value));
 +
 +    Scan scan = new Scan(get);
 +    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
 +        scan.getFamilyMap().get(store.getFamily().getName()),
 +        0);
 +
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    Collections.sort(results, KeyValue.COMPARATOR);
 +    scanner.close();
 +
 +    //Compare
 +    Assert.assertEquals(expected.size(), results.size());
 +    for(int i=0; i<results.size(); i++) {
 +      // Verify the values
 +      Assert.assertEquals(expected.get(i), results.get(i));
 +    }
 +  }
 +
 +  /**
 +   * Getting MOB data from files
 +   * @throws IOException
 +   */
 +  @Test
 +  public void testGetFromFiles() throws IOException {
 +    final Configuration conf = TEST_UTIL.getConfiguration();
 +    init(name.getMethodName(), conf, false);
 +
 +    //Put data in memstore
 +    this.store.add(new KeyValue(row, family, qf1, 1, value));
 +    this.store.add(new KeyValue(row, family, qf2, 1, value));
 +    //flush
 +    flush(1);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf3, 1, value));
 +    this.store.add(new KeyValue(row, family, qf4, 1, value));
 +    //flush
 +    flush(2);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf5, 1, value));
 +    this.store.add(new KeyValue(row, family, qf6, 1, value));
 +    //flush
 +    flush(3);
 +
 +    Scan scan = new Scan(get);
 +    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
 +        scan.getFamilyMap().get(store.getFamily().getName()),
 +        0);
 +
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    Collections.sort(results, KeyValue.COMPARATOR);
 +    scanner.close();
 +
 +    //Compare
 +    Assert.assertEquals(expected.size(), results.size());
 +    for(int i=0; i<results.size(); i++) {
 +      Assert.assertEquals(expected.get(i), results.get(i));
 +    }
 +  }
 +
 +  /**
 +   * Getting the reference data from files
 +   * @throws IOException
 +   */
 +  @Test
 +  public void testGetReferencesFromFiles() throws IOException {
 +    final Configuration conf = HBaseConfiguration.create();
 +    init(name.getMethodName(), conf, false);
 +
 +    //Put data in memstore
 +    this.store.add(new KeyValue(row, family, qf1, 1, value));
 +    this.store.add(new KeyValue(row, family, qf2, 1, value));
 +    //flush
 +    flush(1);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf3, 1, value));
 +    this.store.add(new KeyValue(row, family, qf4, 1, value));
 +    //flush
 +    flush(2);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf5, 1, value));
 +    this.store.add(new KeyValue(row, family, qf6, 1, value));
 +    //flush
 +    flush(3);
 +
 +    Scan scan = new Scan(get);
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
 +      scan.getFamilyMap().get(store.getFamily().getName()),
 +      0);
 +
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    Collections.sort(results, KeyValue.COMPARATOR);
 +    scanner.close();
 +
 +    //Compare
 +    Assert.assertEquals(expected.size(), results.size());
 +    for(int i=0; i<results.size(); i++) {
 +      Cell cell = results.get(i);
 +      Assert.assertTrue(MobUtils.isMobReferenceCell(cell));
 +    }
 +  }
 +
 +  /**
 +   * Getting data from memstore and files
 +   * @throws IOException
 +   */
 +  @Test
 +  public void testGetFromMemStoreAndFiles() throws IOException {
 +
 +    final Configuration conf = HBaseConfiguration.create();
 +
 +    init(name.getMethodName(), conf, false);
 +
 +    //Put data in memstore
 +    this.store.add(new KeyValue(row, family, qf1, 1, value));
 +    this.store.add(new KeyValue(row, family, qf2, 1, value));
 +    //flush
 +    flush(1);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf3, 1, value));
 +    this.store.add(new KeyValue(row, family, qf4, 1, value));
 +    //flush
 +    flush(2);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf5, 1, value));
 +    this.store.add(new KeyValue(row, family, qf6, 1, value));
 +
 +    Scan scan = new Scan(get);
 +    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
 +        scan.getFamilyMap().get(store.getFamily().getName()),
 +        0);
 +
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    Collections.sort(results, KeyValue.COMPARATOR);
 +    scanner.close();
 +
 +    //Compare
 +    Assert.assertEquals(expected.size(), results.size());
 +    for(int i=0; i<results.size(); i++) {
 +      Assert.assertEquals(expected.get(i), results.get(i));
 +    }
 +  }
 +
 +  /**
 +   * Getting data from memstore and files
 +   * @throws IOException
 +   */
 +  @Test
 +  public void testMobCellSizeThreshold() throws IOException {
 +
 +    final Configuration conf = HBaseConfiguration.create();
 +
 +    HColumnDescriptor hcd;
 +    hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(100);
 +    hcd.setMaxVersions(4);
 +    init(name.getMethodName(), conf, hcd, false);
 +
 +    //Put data in memstore
 +    this.store.add(new KeyValue(row, family, qf1, 1, value));
 +    this.store.add(new KeyValue(row, family, qf2, 1, value));
 +    //flush
 +    flush(1);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf3, 1, value));
 +    this.store.add(new KeyValue(row, family, qf4, 1, value));
 +    //flush
 +    flush(2);
 +
 +    //Add more data
 +    this.store.add(new KeyValue(row, family, qf5, 1, value));
 +    this.store.add(new KeyValue(row, family, qf6, 1, value));
 +    //flush
 +    flush(3);
 +
 +    Scan scan = new Scan(get);
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
 +      scan.getFamilyMap().get(store.getFamily().getName()),
 +      0);
 +
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    Collections.sort(results, KeyValue.COMPARATOR);
 +    scanner.close();
 +
 +    //Compare
 +    Assert.assertEquals(expected.size(), results.size());
 +    for(int i=0; i<results.size(); i++) {
 +      Cell cell = results.get(i);
 +      //this is not mob reference cell.
 +      Assert.assertFalse(MobUtils.isMobReferenceCell(cell));
 +      Assert.assertEquals(expected.get(i), results.get(i));
 +      Assert.assertEquals(100, store.getFamily().getMobThreshold());
 +    }
 +  }
 +
 +  @Test
 +  public void testCommitFile() throws Exception {
 +    final Configuration conf = HBaseConfiguration.create();
 +    init(name.getMethodName(), conf, true);
 +    String targetPathName = MobUtils.formatDate(new Date());
 +    Path targetPath = new Path(store.getPath(), (targetPathName
 +        + Path.SEPARATOR + mobFilePath.getName()));
 +    fs.delete(targetPath, true);
 +    Assert.assertFalse(fs.exists(targetPath));
 +    //commit file
 +    store.commitFile(mobFilePath, targetPath);
 +    Assert.assertTrue(fs.exists(targetPath));
 +  }
 +
 +  @Test
 +  public void testResolve() throws Exception {
 +    final Configuration conf = HBaseConfiguration.create();
 +    init(name.getMethodName(), conf, true);
 +    String targetPathName = MobUtils.formatDate(currentDate);
 +    Path targetPath = new Path(store.getPath(), targetPathName);
 +    store.commitFile(mobFilePath, targetPath);
 +    //resolve
 +    Cell resultCell1 = store.resolve(seekKey1, false);
 +    Cell resultCell2 = store.resolve(seekKey2, false);
 +    Cell resultCell3 = store.resolve(seekKey3, false);
 +    //compare
 +    Assert.assertEquals(Bytes.toString(value),
 +        Bytes.toString(CellUtil.cloneValue(resultCell1)));
 +    Assert.assertEquals(Bytes.toString(value),
 +        Bytes.toString(CellUtil.cloneValue(resultCell2)));
 +    Assert.assertEquals(Bytes.toString(value2),
 +        Bytes.toString(CellUtil.cloneValue(resultCell3)));
 +  }
 +
 +  /**
 +   * Flush the memstore
 +   * @param storeFilesSize
 +   * @throws IOException
 +   */
 +  private void flush(int storeFilesSize) throws IOException{
 +    this.store.snapshot();
 +    flushStore(store, id++);
 +    Assert.assertEquals(storeFilesSize, this.store.getStorefiles().size());
-     Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).kvset.size());
++    Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
 +  }
 +
 +  /**
 +   * Flush the memstore
 +   * @param store
 +   * @param id
 +   * @throws IOException
 +   */
 +  private static void flushStore(HMobStore store, long id) throws IOException {
 +    StoreFlushContext storeFlushCtx = store.createFlushContext(id);
 +    storeFlushCtx.prepare();
 +    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
 +    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
index 2d68cd1,0000000..d429de5
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
@@@ -1,467 -1,0 +1,467 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
 +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Random;
 +import java.util.Set;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValueUtil;
- import org.apache.hadoop.hbase.MediumTests;
 +import org.apache.hadoop.hbase.client.Delete;
 +import org.apache.hadoop.hbase.client.Durability;
 +import org.apache.hadoop.hbase.client.Put;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.io.hfile.HFile;
 +import org.apache.hadoop.hbase.io.hfile.HFileContext;
 +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
++import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.util.Pair;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Rule;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +import org.junit.rules.TestName;
 +
 +/**
 + * Test mob compaction
 + */
 +@Category(MediumTests.class)
 +public class TestMobCompaction {
 +  @Rule
 +  public TestName name = new TestName();
 +  static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName());
 +  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
 +  private Configuration conf = null;
 +
 +  private HRegion region = null;
 +  private HTableDescriptor htd = null;
 +  private HColumnDescriptor hcd = null;
 +  private long mobCellThreshold = 1000;
 +
 +  private FileSystem fs;
 +
 +  private static final byte[] COLUMN_FAMILY = fam1;
 +  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
 +  private int compactionThreshold;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    UTIL.startMiniCluster(1);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    UTIL.shutdownMiniCluster();
 +  }
 +
 +  private void init(Configuration conf, long mobThreshold) throws Exception {
 +    this.conf = conf;
 +    this.mobCellThreshold = mobThreshold;
 +    HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
 +
 +    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
 +    htd = UTIL.createTableDescriptor(name.getMethodName());
 +    hcd = new HColumnDescriptor(COLUMN_FAMILY);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(mobThreshold);
 +    hcd.setMaxVersions(1);
-     htd.addFamily(hcd);
++    htd.modifyFamily(hcd);
 +
 +    region = UTIL.createLocalHRegion(htd, null, null);
 +    fs = FileSystem.get(conf);
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    region.close();
 +    fs.delete(UTIL.getDataTestDir(), true);
 +  }
 +
 +  /**
 +   * During compaction, cells smaller than the threshold won't be affected.
 +   */
 +  @Test
 +  public void testSmallerValue() throws Exception {
 +    init(UTIL.getConfiguration(), 500);
 +    byte[] dummyData = makeDummyData(300); // smaller than mob threshold
 +    HRegionIncommon loader = new HRegionIncommon(region);
 +    // one hfile per row
 +    for (int i = 0; i < compactionThreshold; i++) {
 +      Put p = createPut(i, dummyData);
 +      loader.put(p);
 +      loader.flushcache();
 +    }
 +    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
 +    assertEquals("Before compaction: mob file count", 0, countMobFiles());
 +    assertEquals("Before compaction: rows", compactionThreshold, countRows());
 +    assertEquals("Before compaction: mob rows", 0, countMobRows());
 +
 +    region.compactStores();
 +
 +    assertEquals("After compaction: store files", 1, countStoreFiles());
 +    assertEquals("After compaction: mob file count", 0, countMobFiles());
 +    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
 +    assertEquals("After compaction: rows", compactionThreshold, countRows());
 +    assertEquals("After compaction: mob rows", 0, countMobRows());
 +  }
 +
 +  /**
 +   * During compaction, the mob threshold size is changed.
 +   */
 +  @Test
 +  public void testLargerValue() throws Exception {
 +    init(UTIL.getConfiguration(), 200);
 +    byte[] dummyData = makeDummyData(300); // larger than mob threshold
 +    HRegionIncommon loader = new HRegionIncommon(region);
 +    for (int i = 0; i < compactionThreshold; i++) {
 +      Put p = createPut(i, dummyData);
 +      loader.put(p);
 +      loader.flushcache();
 +    }
 +    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
 +    assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
 +    assertEquals("Before compaction: rows", compactionThreshold, countRows());
 +    assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
 +    assertEquals("Before compaction: number of mob cells", compactionThreshold,
 +        countMobCellsInMetadata());
 +    // Change the threshold larger than the data size
 +    region.getTableDesc().getFamily(COLUMN_FAMILY).setMobThreshold(500);
 +    region.initialize();
 +    region.compactStores();
 +
 +    assertEquals("After compaction: store files", 1, countStoreFiles());
 +    assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
 +    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
 +    assertEquals("After compaction: rows", compactionThreshold, countRows());
 +    assertEquals("After compaction: mob rows", 0, countMobRows());
 +  }
 +
 +  /**
 +   * This test will first generate store files, then bulk load them and trigger the compaction. When
 +   * compaction, the cell value will be larger than the threshold.
 +   */
 +  @Test
 +  public void testMobCompactionWithBulkload() throws Exception {
 +    // The following will produce store files of 600.
 +    init(UTIL.getConfiguration(), 300);
 +    byte[] dummyData = makeDummyData(600);
 +
 +    Path hbaseRootDir = FSUtils.getRootDir(conf);
 +    Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
 +    List<Pair<byte[], String>> hfiles = new ArrayList<Pair<byte[], String>>(1);
 +    for (int i = 0; i < compactionThreshold; i++) {
 +      Path hpath = new Path(basedir, "hfile" + i);
 +      hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
 +      createHFile(hpath, i, dummyData);
 +    }
 +
 +    // The following will bulk load the above generated store files and compact, with 600(fileSize)
 +    // > 300(threshold)
 +    boolean result = region.bulkLoadHFiles(hfiles, true);
 +    assertTrue("Bulkload result:", result);
 +    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
 +    assertEquals("Before compaction: mob file count", 0, countMobFiles());
 +    assertEquals("Before compaction: rows", compactionThreshold, countRows());
 +    assertEquals("Before compaction: mob rows", 0, countMobRows());
 +    assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
 +
 +    region.compactStores();
 +
 +    assertEquals("After compaction: store files", 1, countStoreFiles());
 +    assertEquals("After compaction: mob file count:", 1, countMobFiles());
 +    assertEquals("After compaction: rows", compactionThreshold, countRows());
 +    assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
 +    assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
 +    assertEquals("After compaction: number of mob cells", compactionThreshold,
 +        countMobCellsInMetadata());
 +  }
 +
 +  @Test
 +  public void testMajorCompactionAfterDelete() throws Exception {
 +    init(UTIL.getConfiguration(), 100);
 +    byte[] dummyData = makeDummyData(200); // larger than mob threshold
 +    HRegionIncommon loader = new HRegionIncommon(region);
 +    // create hfiles and mob hfiles but don't trigger compaction
 +    int numHfiles = compactionThreshold - 1;
 +    byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0));
 +    for (int i = 0; i < numHfiles; i++) {
 +      Put p = createPut(i, dummyData);
 +      loader.put(p);
 +      loader.flushcache();
 +    }
 +    assertEquals("Before compaction: store files", numHfiles, countStoreFiles());
 +    assertEquals("Before compaction: mob file count", numHfiles, countMobFiles());
 +    assertEquals("Before compaction: rows", numHfiles, countRows());
 +    assertEquals("Before compaction: mob rows", numHfiles, countMobRows());
 +    assertEquals("Before compaction: number of mob cells", numHfiles, countMobCellsInMetadata());
 +    // now let's delete some cells that contain mobs
 +    Delete delete = new Delete(deleteRow);
 +    delete.deleteFamily(COLUMN_FAMILY);
 +    region.delete(delete);
 +    loader.flushcache();
 +
 +    assertEquals("Before compaction: store files", numHfiles + 1, countStoreFiles());
 +    assertEquals("Before compaction: mob files", numHfiles, countMobFiles());
 +    region.compactStores(true);
 +    assertEquals("After compaction: store files", 1, countStoreFiles());
 +    // still have original mob hfiles and now added a mob del file
 +    assertEquals("After compaction: mob files", numHfiles + 1, countMobFiles());
 +
 +    Scan scan = new Scan();
 +    scan.setRaw(true);
 +    InternalScanner scanner = region.getScanner(scan);
 +    List<Cell> results = new ArrayList<Cell>();
 +    scanner.next(results);
 +    int deleteCount = 0;
 +    while (!results.isEmpty()) {
 +      for (Cell c : results) {
 +        if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
 +          deleteCount++;
 +          assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow));
 +        }
 +      }
 +      results.clear();
 +      scanner.next(results);
 +    }
 +    // assert the delete mark is not retained after the major compaction
 +    assertEquals(0, deleteCount);
 +    scanner.close();
 +    // assert the deleted cell is not counted
 +    assertEquals("The cells in mob files", numHfiles - 1, countMobCellsInMobFiles(1));
 +  }
 +
 +  private int countStoreFiles() throws IOException {
 +    Store store = region.getStore(COLUMN_FAMILY);
 +    return store.getStorefilesCount();
 +  }
 +
 +  private int countMobFiles() throws IOException {
 +    Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
 +        hcd.getNameAsString());
 +    if (fs.exists(mobDirPath)) {
 +      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
 +      return files.length;
 +    }
 +    return 0;
 +  }
 +
 +  private long countMobCellsInMetadata() throws IOException {
 +    long mobCellsCount = 0;
 +    Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
 +        hcd.getNameAsString());
 +    Configuration copyOfConf = new Configuration(conf);
 +    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
 +    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
 +    if (fs.exists(mobDirPath)) {
 +      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
 +      for (FileStatus file : files) {
 +        StoreFile sf = new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE);
 +        Map<byte[], byte[]> fileInfo = sf.createReader().loadFileInfo();
 +        byte[] count = fileInfo.get(StoreFile.MOB_CELLS_COUNT);
 +        assertTrue(count != null);
 +        mobCellsCount += Bytes.toLong(count);
 +      }
 +    }
 +    return mobCellsCount;
 +  }
 +
 +  private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
 +    Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
 +    p.setDurability(Durability.SKIP_WAL);
 +    p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
 +    return p;
 +  }
 +
 +  /**
 +   * Create an HFile with the given number of bytes
 +   */
 +  private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
 +    HFileContext meta = new HFileContextBuilder().build();
 +    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
 +        .withFileContext(meta).create();
 +    long now = System.currentTimeMillis();
 +    try {
 +      KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
 +          Bytes.toBytes("colX"), now, dummyData);
 +      writer.append(kv);
 +    } finally {
 +      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
 +      writer.close();
 +    }
 +  }
 +
 +  private int countMobRows() throws IOException {
 +    Scan scan = new Scan();
 +    // Do not retrieve the mob data when scanning
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    InternalScanner scanner = region.getScanner(scan);
 +
 +    int scannedCount = 0;
 +    List<Cell> results = new ArrayList<Cell>();
 +    boolean hasMore = true;
 +    while (hasMore) {
 +      hasMore = scanner.next(results);
 +      for (Cell c : results) {
 +        if (MobUtils.isMobReferenceCell(c)) {
 +          scannedCount++;
 +        }
 +      }
 +      results.clear();
 +    }
 +    scanner.close();
 +
 +    return scannedCount;
 +  }
 +
 +  private int countRows() throws IOException {
 +    Scan scan = new Scan();
 +    // Do not retrieve the mob data when scanning
 +    InternalScanner scanner = region.getScanner(scan);
 +
 +    int scannedCount = 0;
 +    List<Cell> results = new ArrayList<Cell>();
 +    boolean hasMore = true;
 +    while (hasMore) {
 +      hasMore = scanner.next(results);
 +      scannedCount += results.size();
 +      results.clear();
 +    }
 +    scanner.close();
 +
 +    return scannedCount;
 +  }
 +
 +  private byte[] makeDummyData(int size) {
 +    byte[] dummyData = new byte[size];
 +    new Random().nextBytes(dummyData);
 +    return dummyData;
 +  }
 +
 +  private int countReferencedMobFiles() throws IOException {
 +    Scan scan = new Scan();
 +    // Do not retrieve the mob data when scanning
 +    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    InternalScanner scanner = region.getScanner(scan);
 +
 +    List<Cell> kvs = new ArrayList<Cell>();
 +    boolean hasMore = true;
 +    String fileName;
 +    Set<String> files = new HashSet<String>();
 +    do {
 +      kvs.clear();
 +      hasMore = scanner.next(kvs);
 +      for (Cell c : kvs) {
 +        KeyValue kv = KeyValueUtil.ensureKeyValue(c);
 +        if (!MobUtils.isMobReferenceCell(kv)) {
 +          continue;
 +        }
 +        if (!MobUtils.hasValidMobRefCellValue(kv)) {
 +          continue;
 +        }
 +        int size = MobUtils.getMobValueLength(kv);
 +        if (size <= mobCellThreshold) {
 +          continue;
 +        }
 +        fileName = MobUtils.getMobFileName(kv);
 +        if (fileName.isEmpty()) {
 +          continue;
 +        }
 +        files.add(fileName);
 +        Path familyPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(),
 +            hcd.getNameAsString());
 +        assertTrue(fs.exists(new Path(familyPath, fileName)));
 +      }
 +    } while (hasMore);
 +
 +    scanner.close();
 +
 +    return files.size();
 +  }
 +
 +  private int countMobCellsInMobFiles(int expectedNumDelfiles) throws IOException {
 +    Configuration copyOfConf = new Configuration(conf);
 +    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
 +    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
 +    Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
 +        hcd.getNameAsString());
 +    List<StoreFile> sfs = new ArrayList<StoreFile>();
 +    int numDelfiles = 0;
 +    int size = 0;
 +    if (fs.exists(mobDirPath)) {
 +      for (FileStatus f : fs.listStatus(mobDirPath)) {
 +        StoreFile sf = new StoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE);
 +        sfs.add(sf);
 +        if (StoreFileInfo.isDelFile(sf.getPath())) {
 +          numDelfiles++;
 +        }
 +      }
 +      List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, null,
 +          HConstants.LATEST_TIMESTAMP);
 +      Scan scan = new Scan();
 +      scan.setMaxVersions(hcd.getMaxVersions());
 +      long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
 +      long ttl = HStore.determineTTLFromFamily(hcd);
 +      ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
 +      StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_DROP_DELETES, null,
 +          scanners, 0L, HConstants.LATEST_TIMESTAMP);
 +      List<Cell> results = new ArrayList<>();
 +      boolean hasMore = true;
 +      while (hasMore) {
 +        hasMore = scanner.next(results);
 +        size += results.size();
 +        results.clear();
 +      }
 +    }
 +    // assert the number of the existing del files
 +    assertEquals(expectedNumDelfiles, numDelfiles);
 +    return size;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
index bb73dba,0000000..1112b12
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
@@@ -1,393 -1,0 +1,393 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import java.io.IOException;
 +import java.util.List;
 +import java.util.Random;
 +
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HRegionInfo;
 +import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.MediumTests;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.client.HBaseAdmin;
 +import org.apache.hadoop.hbase.client.HTable;
 +import org.apache.hadoop.hbase.client.Put;
 +import org.apache.hadoop.hbase.client.Result;
 +import org.apache.hadoop.hbase.client.ResultScanner;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
++import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(MediumTests.class)
 +public class TestMobStoreScanner {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static byte [] row1 = Bytes.toBytes("row1");
 +  private final static byte [] family = Bytes.toBytes("family");
 +  private final static byte [] qf1 = Bytes.toBytes("qualifier1");
 +  private final static byte [] qf2 = Bytes.toBytes("qualifier2");
 +  protected final byte[] qf3 = Bytes.toBytes("qualifier3");
 +  private static HTable table;
 +  private static HBaseAdmin admin;
 +  private static HColumnDescriptor hcd;
 +  private static HTableDescriptor desc;
 +  private static Random random = new Random();
 +  private static long defaultThreshold = 10;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +
 +    TEST_UTIL.startMiniCluster(1);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  public void setUp(long threshold, String TN) throws Exception {
 +    desc = new HTableDescriptor(TableName.valueOf(TN));
 +    hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(threshold);
 +    hcd.setMaxVersions(4);
 +    desc.addFamily(hcd);
 +    admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
 +    admin.createTable(desc);
 +    table = new HTable(TEST_UTIL.getConfiguration(), TN);
 +  }
 +
 +  /**
 +   * Generate the mob value.
 +   *
 +   * @param size the size of the value
 +   * @return the mob value generated
 +   */
 +  private static byte[] generateMobValue(int size) {
 +    byte[] mobVal = new byte[size];
 +    random.nextBytes(mobVal);
 +    return mobVal;
 +  }
 +
 +  /**
 +   * Set the scan attribute
 +   *
 +   * @param reversed if true, scan will be backward order
 +   * @param mobScanRaw if true, scan will get the mob reference
 +   * @return this
 +   */
 +  public void setScan(Scan scan, boolean reversed, boolean mobScanRaw) {
 +    scan.setReversed(reversed);
 +    scan.setMaxVersions(4);
 +    if(mobScanRaw) {
 +      scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
 +    }
 +  }
 +
 +  @Test
 +  public void testMobStoreScanner() throws Exception {
 +	  testGetFromFiles(false);
 +	  testGetFromMemStore(false);
 +    testGetReferences(false);
 +    testMobThreshold(false);
 +    testGetFromArchive(false);
 +  }
 +
 +  @Test
 +  public void testReversedMobStoreScanner() throws Exception {
 +	  testGetFromFiles(true);
 +	  testGetFromMemStore(true);
 +    testGetReferences(true);
 +    testMobThreshold(true);
 +    testGetFromArchive(true);
 +  }
 +
 +  public void testGetFromFiles(boolean reversed) throws Exception {
 +    String TN = "testGetFromFiles" + reversed;
 +    setUp(defaultThreshold, TN);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +    byte [] value = generateMobValue((int)defaultThreshold+1);
 +
 +    Put put1 = new Put(row1);
 +    put1.add(family, qf1, ts3, value);
 +    put1.add(family, qf2, ts2, value);
 +    put1.add(family, qf3, ts1, value);
 +    table.put(put1);
 +
 +    table.flushCommits();
 +    admin.flush(TN);
 +
 +    Scan scan = new Scan();
 +    setScan(scan, reversed, false);
 +
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        Assert.assertEquals(Bytes.toString(value),
 +            Bytes.toString(CellUtil.cloneValue(cell)));
 +        count++;
 +      }
 +    }
 +    results.close();
 +    Assert.assertEquals(3, count);
 +  }
 +
 +  public void testGetFromMemStore(boolean reversed) throws Exception {
 +    String TN = "testGetFromMemStore" + reversed;
 +    setUp(defaultThreshold, TN);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +    byte [] value = generateMobValue((int)defaultThreshold+1);;
 +
 +    Put put1 = new Put(row1);
 +    put1.add(family, qf1, ts3, value);
 +    put1.add(family, qf2, ts2, value);
 +    put1.add(family, qf3, ts1, value);
 +    table.put(put1);
 +
 +    Scan scan = new Scan();
 +    setScan(scan, reversed, false);
 +
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        Assert.assertEquals(Bytes.toString(value),
 +            Bytes.toString(CellUtil.cloneValue(cell)));
 +        count++;
 +      }
 +    }
 +    results.close();
 +    Assert.assertEquals(3, count);
 +  }
 +
 +  public void testGetReferences(boolean reversed) throws Exception {
 +    String TN = "testGetReferences" + reversed;
 +    setUp(defaultThreshold, TN);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +    byte [] value = generateMobValue((int)defaultThreshold+1);;
 +
 +    Put put1 = new Put(row1);
 +    put1.add(family, qf1, ts3, value);
 +    put1.add(family, qf2, ts2, value);
 +    put1.add(family, qf3, ts1, value);
 +    table.put(put1);
 +
 +    table.flushCommits();
 +    admin.flush(TN);
 +
 +    Scan scan = new Scan();
 +    setScan(scan, reversed, true);
 +
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        assertIsMobReference(cell, row1, family, value, TN);
 +        count++;
 +      }
 +    }
 +    results.close();
 +    Assert.assertEquals(3, count);
 +  }
 +
 +  public void testMobThreshold(boolean reversed) throws Exception {
 +    String TN = "testMobThreshold" + reversed;
 +    setUp(defaultThreshold, TN);
 +    byte [] valueLess = generateMobValue((int)defaultThreshold-1);
 +    byte [] valueEqual = generateMobValue((int)defaultThreshold);
 +    byte [] valueGreater = generateMobValue((int)defaultThreshold+1);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +
 +    Put put1 = new Put(row1);
 +    put1.add(family, qf1, ts3, valueLess);
 +    put1.add(family, qf2, ts2, valueEqual);
 +    put1.add(family, qf3, ts1, valueGreater);
 +    table.put(put1);
 +
 +    table.flushCommits();
 +    admin.flush(TN);
 +
 +    Scan scan = new Scan();
 +    setScan(scan, reversed, true);
 +
 +    Cell cellLess= null;
 +    Cell cellEqual = null;
 +    Cell cellGreater = null;
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        String qf = Bytes.toString(CellUtil.cloneQualifier(cell));
 +        if(qf.equals(Bytes.toString(qf1))) {
 +          cellLess = cell;
 +        }
 +        if(qf.equals(Bytes.toString(qf2))) {
 +          cellEqual = cell;
 +        }
 +        if(qf.equals(Bytes.toString(qf3))) {
 +          cellGreater = cell;
 +        }
 +        count++;
 +      }
 +    }
 +    Assert.assertEquals(3, count);
 +    assertNotMobReference(cellLess, row1, family, valueLess);
 +    assertNotMobReference(cellEqual, row1, family, valueEqual);
 +    assertIsMobReference(cellGreater, row1, family, valueGreater, TN);
 +    results.close();
 +  }
 +
 +  public void testGetFromArchive(boolean reversed) throws Exception {
 +    String TN = "testGetFromArchive" + reversed;
 +    setUp(defaultThreshold, TN);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +    byte [] value = generateMobValue((int)defaultThreshold+1);;
 +    // Put some data
 +    Put put1 = new Put(row1);
 +    put1.add(family, qf1, ts3, value);
 +    put1.add(family, qf2, ts2, value);
 +    put1.add(family, qf3, ts1, value);
 +    table.put(put1);
 +
 +    table.flushCommits();
 +    admin.flush(TN);
 +
 +    // Get the files in the mob path
 +    Path mobFamilyPath;
 +    mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
 +        TableName.valueOf(TN)), hcd.getNameAsString());
 +    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
 +    FileStatus[] files = fs.listStatus(mobFamilyPath);
 +
 +    // Get the archive path
 +    Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
 +    Path tableDir = FSUtils.getTableDir(rootDir, TableName.valueOf(TN));
 +    HRegionInfo regionInfo = MobUtils.getMobRegionInfo(TableName.valueOf(TN));
 +    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(),
 +        regionInfo, tableDir, family);
 +
 +    // Move the files from mob path to archive path
 +    fs.mkdirs(storeArchiveDir);
 +    int fileCount = 0;
 +    for(FileStatus file : files) {
 +      fileCount++;
 +      Path filePath = file.getPath();
 +      Path src = new Path(mobFamilyPath, filePath.getName());
 +      Path dst = new Path(storeArchiveDir, filePath.getName());
 +      fs.rename(src, dst);
 +    }
 +
 +    // Verify the moving success
 +    FileStatus[] files1 = fs.listStatus(mobFamilyPath);
 +    Assert.assertEquals(0, files1.length);
 +    FileStatus[] files2 = fs.listStatus(storeArchiveDir);
 +    Assert.assertEquals(fileCount, files2.length);
 +
 +    // Scan from archive
 +    Scan scan = new Scan();
 +    setScan(scan, reversed, false);
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        Assert.assertEquals(Bytes.toString(value),
 +            Bytes.toString(CellUtil.cloneValue(cell)));
 +        count++;
 +      }
 +    }
 +    results.close();
 +    Assert.assertEquals(3, count);
 +  }
 +
 +  /**
 +   * Assert the value is not store in mob.
 +   */
 +  private static void assertNotMobReference(Cell cell, byte[] row, byte[] family,
 +      byte[] value) throws IOException {
 +    Assert.assertEquals(Bytes.toString(row),
 +        Bytes.toString(CellUtil.cloneRow(cell)));
 +    Assert.assertEquals(Bytes.toString(family),
 +        Bytes.toString(CellUtil.cloneFamily(cell)));
 +    Assert.assertTrue(Bytes.toString(value).equals(
 +        Bytes.toString(CellUtil.cloneValue(cell))));
 +  }
 +
 +  /**
 +   * Assert the value is store in mob.
 +   */
 +  private static void assertIsMobReference(Cell cell, byte[] row, byte[] family,
 +      byte[] value, String TN) throws IOException {
 +    Assert.assertEquals(Bytes.toString(row),
 +        Bytes.toString(CellUtil.cloneRow(cell)));
 +    Assert.assertEquals(Bytes.toString(family),
 +        Bytes.toString(CellUtil.cloneFamily(cell)));
 +    Assert.assertFalse(Bytes.toString(value).equals(
 +        Bytes.toString(CellUtil.cloneValue(cell))));
 +    byte[] referenceValue = CellUtil.cloneValue(cell);
 +    String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
 +        referenceValue.length - Bytes.SIZEOF_INT);
 +    int valLen = Bytes.toInt(referenceValue, 0, Bytes.SIZEOF_INT);
 +    Assert.assertEquals(value.length, valLen);
 +    Path mobFamilyPath;
 +    mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
 +        TableName.valueOf(TN)), hcd.getNameAsString());
 +    Path targetPath = new Path(mobFamilyPath, fileName);
 +    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
 +    Assert.assertTrue(fs.exists(targetPath));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
index 48eba8e,aa071ef..5714351
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
@@@ -350,92 -333,35 +333,97 @@@ public class TestRegionServerMetrics 
      for (int insertCount =0; insertCount < 100; insertCount++) {
        Put p = new Put(Bytes.toBytes("" + insertCount + "row"));
        p.add(cf, qualifier, val);
-       t.put(p);
+       puts.add(p);
      }
-     t.flushCommits();
- 
-     Scan s = new Scan();
-     s.setBatch(1);
-     s.setCaching(1);
-     ResultScanner resultScanners = t.getScanner(s);
- 
-     for (int nextCount = 0; nextCount < 30; nextCount++) {
-       Result result = resultScanners.next();
-       assertNotNull(result);
-       assertEquals(1, result.size());
+     try (HTable t = TEST_UTIL.createTable(tableName, cf)) {
+       t.put(puts);
+ 
+       Scan s = new Scan();
+       s.setBatch(1);
+       s.setCaching(1);
+       ResultScanner resultScanners = t.getScanner(s);
+ 
+       for (int nextCount = 0; nextCount < 30; nextCount++) {
+         Result result = resultScanners.next();
+         assertNotNull(result);
+         assertEquals(1, result.size());
+       }
      }
-     for ( HRegionInfo i:t.getRegionLocations().keySet()) {
-       MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName())
-           .getMetrics()
-           .getSource()
-           .getAggregateSource();
-       String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+
-           "_table_"+tableNameString +
-           "_region_" + i.getEncodedName()+
-           "_metric";
-       metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg);
+     try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
+       for ( HRegionLocation location: locator.getAllRegionLocations()) {
+         HRegionInfo i = location.getRegionInfo();
+         MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName())
+             .getMetrics()
+             .getSource()
+             .getAggregateSource();
+         String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+
+             "_table_"+tableNameString +
+             "_region_" + i.getEncodedName()+
+             "_metric";
+         metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg);
+       }
      }
    }
 +
 +  @Test
 +  public void testMobMetrics() throws IOException, InterruptedException {
 +    String tableNameString = "testMobMetrics";
 +    TableName tableName = TableName.valueOf(tableNameString);
 +    byte[] cf = Bytes.toBytes("d");
 +    byte[] qualifier = Bytes.toBytes("qual");
 +    byte[] val = Bytes.toBytes("mobdata");
 +    int numHfiles = conf.getInt("hbase.hstore.compactionThreshold", 3) - 1;
 +    HTableDescriptor htd = new HTableDescriptor(tableName);
 +    HColumnDescriptor hcd = new HColumnDescriptor(cf);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(0);
 +    htd.addFamily(hcd);
 +    HBaseAdmin admin = new HBaseAdmin(conf);
 +    HTable t = TEST_UTIL.createTable(htd, new byte[0][0], conf);
 +    HRegion region = rs.getOnlineRegions(tableName).get(0);
 +    t.setAutoFlush(true, true);
 +    for (int insertCount = 0; insertCount < numHfiles; insertCount++) {
 +      Put p = new Put(Bytes.toBytes(insertCount));
 +      p.add(cf, qualifier, val);
 +      t.put(p);
 +      admin.flush(tableName);
 +    }
 +    metricsRegionServer.getRegionServerWrapper().forceRecompute();
 +    metricsHelper.assertCounter("mobFlushCount", numHfiles, serverSource);
 +    Scan scan = new Scan(Bytes.toBytes(0), Bytes.toBytes(2));
 +    ResultScanner scanner = t.getScanner(scan);
 +    scanner.next(100);
 +    scanner.close();
 +    metricsRegionServer.getRegionServerWrapper().forceRecompute();
 +    metricsHelper.assertCounter("mobScanCellsCount", 2, serverSource);
 +    region.getTableDesc().getFamily(cf).setMobThreshold(100);
 +    region.initialize();
 +    region.compactStores(true);
 +    metricsRegionServer.getRegionServerWrapper().forceRecompute();
 +    metricsHelper.assertCounter("mobCompactedFromMobCellsCount", numHfiles,
 +        serverSource);
 +    metricsHelper.assertCounter("mobCompactedIntoMobCellsCount", 0, serverSource);
 +    scanner = t.getScanner(scan);
 +    scanner.next(100);
 +    metricsRegionServer.getRegionServerWrapper().forceRecompute();
 +    // metrics are reset by the region initialization
 +    metricsHelper.assertCounter("mobScanCellsCount", 0, serverSource);
 +    for (int insertCount = numHfiles;
 +        insertCount < 2 * numHfiles - 1; insertCount++) {
 +      Put p = new Put(Bytes.toBytes(insertCount));
 +      p.add(cf, qualifier, val);
 +      t.put(p);
 +      admin.flush(tableName);
 +    }
 +    region.getTableDesc().getFamily(cf).setMobThreshold(0);
 +    region.initialize();
 +    region.compactStores(true);
 +    metricsRegionServer.getRegionServerWrapper().forceRecompute();
 +    // metrics are reset by the region initialization
 +    metricsHelper.assertCounter("mobCompactedFromMobCellsCount", 0, serverSource);
 +    metricsHelper.assertCounter("mobCompactedIntoMobCellsCount", 2 * numHfiles - 1,
 +        serverSource);
 +    t.close();
 +    admin.close();
 +  }
  }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/MobSnapshotTestingUtils.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/MobSnapshotTestingUtils.java
index 2dcf83a,0000000..b7af75e
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/MobSnapshotTestingUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/MobSnapshotTestingUtils.java
@@@ -1,355 -1,0 +1,349 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.snapshot;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.IOException;
 +import java.util.List;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.CellUtil;
- import org.apache.hadoop.hbase.HBaseTestingUtility;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.HRegionInfo;
- import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.*;
 +import org.apache.hadoop.hbase.client.HTable;
 +import org.apache.hadoop.hbase.client.Result;
 +import org.apache.hadoop.hbase.client.ResultScanner;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 +import org.apache.hadoop.hbase.regionserver.BloomType;
 +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSTableDescriptors;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.junit.Assert;
 +
 +public class MobSnapshotTestingUtils {
 +
 +  /**
 +   * Create the Mob Table.
 +   */
 +  public static void createMobTable(final HBaseTestingUtility util,
 +      final TableName tableName, int regionReplication,
 +      final byte[]... families) throws IOException, InterruptedException {
 +    HTableDescriptor htd = new HTableDescriptor(tableName);
 +    htd.setRegionReplication(regionReplication);
 +    for (byte[] family : families) {
 +      HColumnDescriptor hcd = new HColumnDescriptor(family);
 +      hcd.setMobEnabled(true);
 +      hcd.setMobThreshold(0L);
 +      htd.addFamily(hcd);
 +    }
 +    byte[][] splitKeys = SnapshotTestingUtils.getSplitKeys();
 +    util.getHBaseAdmin().createTable(htd, splitKeys);
 +    SnapshotTestingUtils.waitForTableToBeOnline(util, tableName);
 +    assertEquals((splitKeys.length + 1) * regionReplication, util
 +        .getHBaseAdmin().getTableRegions(tableName).size());
 +  }
 +
 +  /**
 +   * Create a Mob table.
 +   *
 +   * @param util
 +   * @param tableName
 +   * @param families
 +   * @return An HTable instance for the created table.
 +   * @throws IOException
 +   */
 +  public static HTable createMobTable(final HBaseTestingUtility util,
 +      final TableName tableName, final byte[]... families) throws IOException {
 +    HTableDescriptor htd = new HTableDescriptor(tableName);
 +    for (byte[] family : families) {
 +      HColumnDescriptor hcd = new HColumnDescriptor(family);
 +      // Disable blooms (they are on by default as of 0.95) but we disable them
 +      // here because
 +      // tests have hard coded counts of what to expect in block cache, etc.,
 +      // and blooms being
 +      // on is interfering.
 +      hcd.setBloomFilterType(BloomType.NONE);
 +      hcd.setMobEnabled(true);
 +      hcd.setMobThreshold(0L);
 +      htd.addFamily(hcd);
 +    }
 +    util.getHBaseAdmin().createTable(htd);
 +    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait
 +    // until they are assigned
 +    util.waitUntilAllRegionsAssigned(htd.getTableName());
 +    return new HTable(util.getConfiguration(), htd.getTableName());
 +  }
 +
 +  /**
 +   * Return the number of rows in the given table.
 +   */
 +  public static int countMobRows(final HTable table) throws IOException {
 +    Scan scan = new Scan();
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      count++;
 +      List<Cell> cells = res.listCells();
 +      for (Cell cell : cells) {
 +        // Verify the value
 +        Assert.assertTrue(CellUtil.cloneValue(cell).length > 0);
 +      }
 +    }
 +    results.close();
 +    return count;
 +  }
 +
 +  /**
 +   * Return the number of rows in the given table.
 +   */
 +  public static int countMobRows(final HTable table, final byte[]... families)
 +      throws IOException {
 +    Scan scan = new Scan();
 +    for (byte[] family : families) {
 +      scan.addFamily(family);
 +    }
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      count++;
 +      List<Cell> cells = res.listCells();
 +      for (Cell cell : cells) {
 +        // Verify the value
 +        Assert.assertTrue(CellUtil.cloneValue(cell).length > 0);
 +      }
 +    }
 +    results.close();
 +    return count;
 +  }
 +
 +  public static void verifyMobRowCount(final HBaseTestingUtility util,
 +      final TableName tableName, long expectedRows) throws IOException {
 +    HTable table = new HTable(util.getConfiguration(), tableName);
 +    try {
 +      assertEquals(expectedRows, countMobRows(table));
 +    } finally {
 +      table.close();
 +    }
 +  }
 +
 +  // ==========================================================================
 +  // Snapshot Mock
 +  // ==========================================================================
 +  public static class SnapshotMock {
 +    private final static String TEST_FAMILY = "cf";
 +    public final static int TEST_NUM_REGIONS = 4;
 +
 +    private final Configuration conf;
 +    private final FileSystem fs;
 +    private final Path rootDir;
 +
 +    static class RegionData {
 +      public HRegionInfo hri;
 +      public Path tableDir;
 +      public Path[] files;
 +
 +      public RegionData(final Path tableDir, final HRegionInfo hri,
 +          final int nfiles) {
 +        this.tableDir = tableDir;
 +        this.hri = hri;
 +        this.files = new Path[nfiles];
 +      }
 +    }
 +
 +    public static class SnapshotBuilder {
 +      private final RegionData[] tableRegions;
 +      private final SnapshotDescription desc;
 +      private final HTableDescriptor htd;
 +      private final Configuration conf;
 +      private final FileSystem fs;
 +      private final Path rootDir;
 +      private Path snapshotDir;
 +      private int snapshotted = 0;
 +
 +      public SnapshotBuilder(final Configuration conf, final FileSystem fs,
 +          final Path rootDir, final HTableDescriptor htd,
 +          final SnapshotDescription desc, final RegionData[] tableRegions)
 +          throws IOException {
 +        this.fs = fs;
 +        this.conf = conf;
 +        this.rootDir = rootDir;
 +        this.htd = htd;
 +        this.desc = desc;
 +        this.tableRegions = tableRegions;
 +        this.snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc,
 +            rootDir);
 +        new FSTableDescriptors(conf).createTableDescriptorForTableDirectory(
-             snapshotDir, htd, false);
++            snapshotDir, new TableDescriptor(htd), false);
 +      }
 +
 +      public HTableDescriptor getTableDescriptor() {
 +        return this.htd;
 +      }
 +
 +      public SnapshotDescription getSnapshotDescription() {
 +        return this.desc;
 +      }
 +
 +      public Path getSnapshotsDir() {
 +        return this.snapshotDir;
 +      }
 +
 +      public Path[] addRegion() throws IOException {
 +        return addRegion(desc);
 +      }
 +
 +      public Path[] addRegionV1() throws IOException {
 +        return addRegion(desc.toBuilder()
 +            .setVersion(SnapshotManifestV1.DESCRIPTOR_VERSION).build());
 +      }
 +
 +      public Path[] addRegionV2() throws IOException {
 +        return addRegion(desc.toBuilder()
 +            .setVersion(SnapshotManifestV2.DESCRIPTOR_VERSION).build());
 +      }
 +
 +      private Path[] addRegion(final SnapshotDescription desc)
 +          throws IOException {
 +        if (this.snapshotted == tableRegions.length) {
 +          throw new UnsupportedOperationException(
 +              "No more regions in the table");
 +        }
 +
 +        RegionData regionData = tableRegions[this.snapshotted++];
 +        ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(
 +            desc.getName());
 +        SnapshotManifest manifest = SnapshotManifest.create(conf, fs,
 +            snapshotDir, desc, monitor);
 +        manifest.addRegion(regionData.tableDir, regionData.hri);
 +        return regionData.files;
 +      }
 +
 +      public Path commit() throws IOException {
 +        ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(
 +            desc.getName());
 +        SnapshotManifest manifest = SnapshotManifest.create(conf, fs,
 +            snapshotDir, desc, monitor);
 +        manifest.addTableDescriptor(htd);
 +        manifest.consolidate();
 +        SnapshotDescriptionUtils.completeSnapshot(desc, rootDir, snapshotDir,
 +            fs);
 +        snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(desc,
 +            rootDir);
 +        return snapshotDir;
 +      }
 +    }
 +
 +    public SnapshotMock(final Configuration conf, final FileSystem fs,
 +        final Path rootDir) {
 +      this.fs = fs;
 +      this.conf = conf;
 +      this.rootDir = rootDir;
 +    }
 +
 +    public SnapshotBuilder createSnapshotV1(final String snapshotName)
 +        throws IOException {
 +      return createSnapshot(snapshotName, SnapshotManifestV1.DESCRIPTOR_VERSION);
 +    }
 +
 +    public SnapshotBuilder createSnapshotV2(final String snapshotName)
 +        throws IOException {
 +      return createSnapshot(snapshotName, SnapshotManifestV2.DESCRIPTOR_VERSION);
 +    }
 +
 +    private SnapshotBuilder createSnapshot(final String snapshotName,
 +        final int version) throws IOException {
 +      HTableDescriptor htd = createHtd(snapshotName);
 +
 +      RegionData[] regions = createTable(htd, TEST_NUM_REGIONS);
 +
 +      SnapshotDescription desc = SnapshotDescription.newBuilder()
 +          .setTable(htd.getNameAsString()).setName(snapshotName)
 +          .setVersion(version).build();
 +
 +      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc,
 +          rootDir);
 +      SnapshotDescriptionUtils.writeSnapshotInfo(desc, workingDir, fs);
 +      return new SnapshotBuilder(conf, fs, rootDir, htd, desc, regions);
 +    }
 +
 +    public HTableDescriptor createHtd(final String tableName) {
 +      HTableDescriptor htd = new HTableDescriptor(tableName);
 +      HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY);
 +      hcd.setMobEnabled(true);
 +      hcd.setMobThreshold(0L);
 +      htd.addFamily(hcd);
 +      return htd;
 +    }
 +
 +    private RegionData[] createTable(final HTableDescriptor htd,
 +        final int nregions) throws IOException {
 +      Path tableDir = FSUtils.getTableDir(rootDir, htd.getTableName());
 +      new FSTableDescriptors(conf).createTableDescriptorForTableDirectory(
-           tableDir, htd, false);
++          tableDir, new TableDescriptor(htd), false);
 +
 +      assertTrue(nregions % 2 == 0);
 +      RegionData[] regions = new RegionData[nregions];
 +      for (int i = 0; i < regions.length; i += 2) {
 +        byte[] startKey = Bytes.toBytes(0 + i * 2);
 +        byte[] endKey = Bytes.toBytes(1 + i * 2);
 +
 +        // First region, simple with one plain hfile.
 +        HRegionInfo hri = new HRegionInfo(htd.getTableName(), startKey, endKey);
 +        HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(
 +            conf, fs, tableDir, hri);
 +        regions[i] = new RegionData(tableDir, hri, 3);
 +        for (int j = 0; j < regions[i].files.length; ++j) {
 +          Path storeFile = createStoreFile(rfs.createTempName());
 +          regions[i].files[j] = rfs.commitStoreFile(TEST_FAMILY, storeFile);
 +        }
 +
 +        // Second region, used to test the split case.
 +        // This region contains a reference to the hfile in the first region.
 +        startKey = Bytes.toBytes(2 + i * 2);
 +        endKey = Bytes.toBytes(3 + i * 2);
 +        hri = new HRegionInfo(htd.getTableName());
 +        rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir,
 +            hri);
 +        regions[i + 1] = new RegionData(tableDir, hri, regions[i].files.length);
 +        for (int j = 0; j < regions[i].files.length; ++j) {
 +          String refName = regions[i].files[j].getName() + '.'
 +              + regions[i].hri.getEncodedName();
 +          Path refFile = createStoreFile(new Path(rootDir, refName));
 +          regions[i + 1].files[j] = rfs.commitStoreFile(TEST_FAMILY, refFile);
 +        }
 +      }
 +      return regions;
 +    }
 +
 +    private Path createStoreFile(final Path storeFile) throws IOException {
 +      FSDataOutputStream out = fs.create(storeFile);
 +      try {
 +        out.write(Bytes.toBytes(storeFile.toString()));
 +      } finally {
 +        out.close();
 +      }
 +      return storeFile;
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java
----------------------------------------------------------------------