You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/07/22 21:52:28 UTC

[10/50] [abbrv] hbase git commit: Merge branch 'apache/master' (4/16/15) into hbase-11339

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotFromClient.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotFromClient.java
index 5bf5a30,0000000..b4c3aca
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotFromClient.java
@@@ -1,305 -1,0 +1,305 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.client;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.fail;
 +
 +import java.util.List;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.TableNotFoundException;
 +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 +import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
 +import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
 +import org.apache.hadoop.hbase.snapshot.SnapshotManifestV1;
 +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 +import org.apache.hadoop.hbase.testclassification.ClientTests;
 +import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +import com.google.common.collect.Lists;
 +
 +/**
 + * Test create/using/deleting snapshots from the client
 + * <p>
 + * This is an end-to-end test for the snapshot utility
 + */
 +@Category({LargeTests.class, ClientTests.class})
 +public class TestMobSnapshotFromClient {
 +  private static final Log LOG = LogFactory.getLog(TestSnapshotFromClient.class);
 +  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 +  private static final int NUM_RS = 2;
 +  private static final String STRING_TABLE_NAME = "test";
 +  protected static final byte[] TEST_FAM = Bytes.toBytes("fam");
 +  protected static final TableName TABLE_NAME =
 +      TableName.valueOf(STRING_TABLE_NAME);
 +
 +  /**
 +   * Setup the config for the cluster
 +   * @throws Exception on failure
 +   */
 +  @BeforeClass
 +  public static void setupCluster() throws Exception {
 +    setupConf(UTIL.getConfiguration());
 +    UTIL.startMiniCluster(NUM_RS);
 +  }
 +
 +  private static void setupConf(Configuration conf) {
 +    // disable the ui
 +    conf.setInt("hbase.regionsever.info.port", -1);
 +    // change the flush size to a small amount, regulating number of store files
 +    conf.setInt("hbase.hregion.memstore.flush.size", 25000);
 +    // so make sure we get a compaction when doing a load, but keep around some
 +    // files in the store
 +    conf.setInt("hbase.hstore.compaction.min", 10);
 +    conf.setInt("hbase.hstore.compactionThreshold", 10);
 +    // block writes if we get to 12 store files
 +    conf.setInt("hbase.hstore.blockingStoreFiles", 12);
 +    // Enable snapshot
 +    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
 +    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
 +      ConstantSizeRegionSplitPolicy.class.getName());
 +    conf.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
 +  }
 +
 +  @Before
 +  public void setup() throws Exception {
 +    MobSnapshotTestingUtils.createMobTable(UTIL, TABLE_NAME, getNumReplicas(), TEST_FAM);
 +  }
 +
 +  protected int getNumReplicas() {
 +    return 1;
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    UTIL.deleteTable(TABLE_NAME);
 +    SnapshotTestingUtils.deleteAllSnapshots(UTIL.getHBaseAdmin());
 +    SnapshotTestingUtils.deleteArchiveDirectory(UTIL);
 +  }
 +
 +  @AfterClass
 +  public static void cleanupTest() throws Exception {
 +    try {
 +      UTIL.shutdownMiniCluster();
 +    } catch (Exception e) {
 +      LOG.warn("failure shutting down cluster", e);
 +    }
 +  }
 +
 +  /**
 +   * Test snapshotting not allowed hbase:meta and -ROOT-
 +   * @throws Exception
 +   */
 +  @Test (timeout=300000)
 +  public void testMetaTablesSnapshot() throws Exception {
 +    Admin admin = UTIL.getHBaseAdmin();
 +    byte[] snapshotName = Bytes.toBytes("metaSnapshot");
 +
 +    try {
 +      admin.snapshot(snapshotName, TableName.META_TABLE_NAME);
 +      fail("taking a snapshot of hbase:meta should not be allowed");
 +    } catch (IllegalArgumentException e) {
 +      // expected
 +    }
 +  }
 +
 +  /**
 +   * Test HBaseAdmin#deleteSnapshots(String) which deletes snapshots whose names match the parameter
 +   *
 +   * @throws Exception
 +   */
 +  @Test (timeout=300000)
 +  public void testSnapshotDeletionWithRegex() throws Exception {
 +    Admin admin = UTIL.getHBaseAdmin();
 +    // make sure we don't fail on listing snapshots
 +    SnapshotTestingUtils.assertNoSnapshots(admin);
 +
 +    // put some stuff in the table
-     HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME);
++    Table table = ConnectionFactory.createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
 +    UTIL.loadTable(table, TEST_FAM);
 +    table.close();
 +
 +    byte[] snapshot1 = Bytes.toBytes("TableSnapshot1");
 +    admin.snapshot(snapshot1, TABLE_NAME);
 +    LOG.debug("Snapshot1 completed.");
 +
 +    byte[] snapshot2 = Bytes.toBytes("TableSnapshot2");
 +    admin.snapshot(snapshot2, TABLE_NAME);
 +    LOG.debug("Snapshot2 completed.");
 +
 +    String snapshot3 = "3rdTableSnapshot";
 +    admin.snapshot(Bytes.toBytes(snapshot3), TABLE_NAME);
 +    LOG.debug(snapshot3 + " completed.");
 +
 +    // delete the first two snapshots
 +    admin.deleteSnapshots("TableSnapshot.*");
 +    List<SnapshotDescription> snapshots = admin.listSnapshots();
 +    assertEquals(1, snapshots.size());
 +    assertEquals(snapshots.get(0).getName(), snapshot3);
 +
 +    admin.deleteSnapshot(snapshot3);
 +    admin.close();
 +  }
 +  /**
 +   * Test snapshotting a table that is offline
 +   * @throws Exception
 +   */
 +  @Test (timeout=300000)
 +  public void testOfflineTableSnapshot() throws Exception {
 +    Admin admin = UTIL.getHBaseAdmin();
 +    // make sure we don't fail on listing snapshots
 +    SnapshotTestingUtils.assertNoSnapshots(admin);
 +
 +    // put some stuff in the table
-     HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME);
++    Table table = ConnectionFactory.createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
 +    UTIL.loadTable(table, TEST_FAM, false);
 +
 +    LOG.debug("FS state before disable:");
 +    FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
 +      FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
 +    // XXX if this is flakey, might want to consider using the async version and looping as
 +    // disableTable can succeed and still timeout.
 +    admin.disableTable(TABLE_NAME);
 +
 +    LOG.debug("FS state before snapshot:");
 +    FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
 +      FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
 +
 +    // take a snapshot of the disabled table
 +    final String SNAPSHOT_NAME = "offlineTableSnapshot";
 +    byte[] snapshot = Bytes.toBytes(SNAPSHOT_NAME);
 +
 +    SnapshotDescription desc = SnapshotDescription.newBuilder()
 +      .setType(SnapshotDescription.Type.DISABLED)
 +      .setTable(STRING_TABLE_NAME)
 +      .setName(SNAPSHOT_NAME)
 +      .setVersion(SnapshotManifestV1.DESCRIPTOR_VERSION)
 +      .build();
 +    admin.snapshot(desc);
 +    LOG.debug("Snapshot completed.");
 +
 +    // make sure we have the snapshot
 +    List<SnapshotDescription> snapshots = SnapshotTestingUtils.assertOneSnapshotThatMatches(admin,
 +      snapshot, TABLE_NAME);
 +
 +    // make sure its a valid snapshot
 +    FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
 +    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
 +    LOG.debug("FS state after snapshot:");
 +    FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
 +      FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
 +
 +    SnapshotTestingUtils.confirmSnapshotValid(snapshots.get(0), TABLE_NAME, TEST_FAM, rootDir,
 +      admin, fs);
 +
 +    admin.deleteSnapshot(snapshot);
 +    snapshots = admin.listSnapshots();
 +    SnapshotTestingUtils.assertNoSnapshots(admin);
 +  }
 +
 +  @Test (timeout=300000)
 +  public void testSnapshotFailsOnNonExistantTable() throws Exception {
 +    Admin admin = UTIL.getHBaseAdmin();
 +    // make sure we don't fail on listing snapshots
 +    SnapshotTestingUtils.assertNoSnapshots(admin);
 +    String tableName = "_not_a_table";
 +
 +    // make sure the table doesn't exist
 +    boolean fail = false;
 +    do {
 +    try {
 +      admin.getTableDescriptor(TableName.valueOf(tableName));
 +      fail = true;
 +          LOG.error("Table:" + tableName + " already exists, checking a new name");
 +      tableName = tableName+"!";
 +    } catch (TableNotFoundException e) {
 +      fail = false;
 +      }
 +    } while (fail);
 +
 +    // snapshot the non-existant table
 +    try {
 +      admin.snapshot("fail", TableName.valueOf(tableName));
 +      fail("Snapshot succeeded even though there is not table.");
 +    } catch (SnapshotCreationException e) {
 +      LOG.info("Correctly failed to snapshot a non-existant table:" + e.getMessage());
 +    }
 +  }
 +
 +  @Test (timeout=300000)
 +  public void testOfflineTableSnapshotWithEmptyRegions() throws Exception {
 +    // test with an empty table with one region
 +
 +    Admin admin = UTIL.getHBaseAdmin();
 +    // make sure we don't fail on listing snapshots
 +    SnapshotTestingUtils.assertNoSnapshots(admin);
 +
 +    LOG.debug("FS state before disable:");
 +    FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
 +      FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
 +    admin.disableTable(TABLE_NAME);
 +
 +    LOG.debug("FS state before snapshot:");
 +    FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
 +      FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
 +
 +    // take a snapshot of the disabled table
 +    byte[] snapshot = Bytes.toBytes("testOfflineTableSnapshotWithEmptyRegions");
 +    admin.snapshot(snapshot, TABLE_NAME);
 +    LOG.debug("Snapshot completed.");
 +
 +    // make sure we have the snapshot
 +    List<SnapshotDescription> snapshots = SnapshotTestingUtils.assertOneSnapshotThatMatches(admin,
 +      snapshot, TABLE_NAME);
 +
 +    // make sure its a valid snapshot
 +    FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
 +    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
 +    LOG.debug("FS state after snapshot:");
 +    FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
 +      FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
 +
 +    List<byte[]> emptyCfs = Lists.newArrayList(TEST_FAM); // no file in the region
 +    List<byte[]> nonEmptyCfs = Lists.newArrayList();
 +    SnapshotTestingUtils.confirmSnapshotValid(snapshots.get(0), TABLE_NAME, nonEmptyCfs, emptyCfs,
 +      rootDir, admin, fs);
 +
 +    admin.deleteSnapshot(snapshot);
 +    snapshots = admin.listSnapshots();
 +    SnapshotTestingUtils.assertNoSnapshots(admin);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
index 5e28cd9,0000000..bc3354b
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
@@@ -1,86 -1,0 +1,87 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob;
 +
 +import java.io.IOException;
 +import java.util.Random;
 +
++import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.regionserver.StoreFile;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.junit.Assert;
 +
 +public class MobTestUtil {
 +  protected static final char FIRST_CHAR = 'a';
 +  protected static final char LAST_CHAR = 'z';
 +
 +  protected static String generateRandomString(int demoLength) {
 +    String base = "abcdefghijklmnopqrstuvwxyz";
 +    Random random = new Random();
 +    StringBuffer sb = new StringBuffer();
 +    for (int i = 0; i < demoLength; i++) {
 +      int number = random.nextInt(base.length());
 +      sb.append(base.charAt(number));
 +    }
 +    return sb.toString();
 +  }
 +  protected static void writeStoreFile(final StoreFile.Writer writer, String caseName)
 +      throws IOException {
 +    writeStoreFile(writer, Bytes.toBytes(caseName), Bytes.toBytes(caseName));
 +  }
 +
 +  /*
 +   * Writes HStoreKey and ImmutableBytes data to passed writer and then closes
 +   * it.
 +   *
 +   * @param writer
 +   *
 +   * @throws IOException
 +   */
 +  private static void writeStoreFile(final StoreFile.Writer writer, byte[] fam,
 +      byte[] qualifier) throws IOException {
 +    long now = System.currentTimeMillis();
 +    try {
 +      for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
 +        for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
 +          byte[] b = new byte[] { (byte) d, (byte) e };
 +          writer.append(new KeyValue(b, fam, qualifier, now, b));
 +        }
 +      }
 +    } finally {
 +      writer.close();
 +    }
 +  }
 +
 +  /**
-    * Compare two KeyValue only for their row family qualifier value
++   * Compare two Cells only for their row family qualifier value
 +   */
-   public static void assertKeyValuesEquals(KeyValue firstKeyValue,
- 	      KeyValue secondKeyValue) {
++  public static void assertCellEquals(Cell firstKeyValue,
++	      Cell secondKeyValue) {
 +		    Assert.assertEquals(Bytes.toString(CellUtil.cloneRow(firstKeyValue)),
 +	            Bytes.toString(CellUtil.cloneRow(secondKeyValue)));
 +		    Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(firstKeyValue)),
 +	            Bytes.toString(CellUtil.cloneFamily(secondKeyValue)));
 +		    Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(firstKeyValue)),
 +	            Bytes.toString(CellUtil.cloneQualifier(secondKeyValue)));
 +		    Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(firstKeyValue)),
 +	            Bytes.toString(CellUtil.cloneValue(secondKeyValue)));
 +	  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
index e0e9541,0000000..b38e7cb
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
@@@ -1,154 -1,0 +1,154 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob;
 +
 +import junit.framework.TestCase;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HBaseConfiguration;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValue.Type;
- import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.io.hfile.HFileContext;
 +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 +import org.apache.hadoop.hbase.regionserver.StoreFile;
 +import org.apache.hadoop.hbase.testclassification.SmallTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(SmallTests.class)
 +public class TestCachedMobFile extends TestCase{
 +  static final Log LOG = LogFactory.getLog(TestCachedMobFile.class);
 +  private Configuration conf = HBaseConfiguration.create();
 +  private CacheConfig cacheConf = new CacheConfig(conf);
-   private final String TABLE = "tableName";
-   private final String FAMILY = "familyName";
-   private final String FAMILY1 = "familyName1";
-   private final String FAMILY2 = "familyName2";
-   private final long EXPECTED_REFERENCE_ZERO = 0;
-   private final long EXPECTED_REFERENCE_ONE = 1;
-   private final long EXPECTED_REFERENCE_TWO = 2;
++  private static final String TABLE = "tableName";
++  private static final String FAMILY = "familyName";
++  private static final String FAMILY1 = "familyName1";
++  private static final String FAMILY2 = "familyName2";
++  private static final long EXPECTED_REFERENCE_ZERO = 0;
++  private static final long EXPECTED_REFERENCE_ONE = 1;
++  private static final long EXPECTED_REFERENCE_TWO = 2;
 +
 +  @Test
 +  public void testOpenClose() throws Exception {
 +    String caseName = getName();
 +    FileSystem fs = FileSystem.get(conf);
 +    Path testDir = FSUtils.getRootDir(conf);
 +    Path outputDir = new Path(new Path(testDir, TABLE),
 +        FAMILY);
 +    HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
 +    StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
 +        .withOutputDir(outputDir).withFileContext(meta).build();
 +    MobTestUtil.writeStoreFile(writer, caseName);
 +    CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
 +    Assert.assertEquals(EXPECTED_REFERENCE_ZERO, cachedMobFile.getReferenceCount());
 +    cachedMobFile.open();
 +    Assert.assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile.getReferenceCount());
 +    cachedMobFile.open();
 +    Assert.assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile.getReferenceCount());
 +    cachedMobFile.close();
 +    Assert.assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile.getReferenceCount());
 +    cachedMobFile.close();
 +    Assert.assertEquals(EXPECTED_REFERENCE_ZERO, cachedMobFile.getReferenceCount());
 +  }
 +
 +  @Test
 +  public void testCompare() throws Exception {
 +    String caseName = getName();
 +    FileSystem fs = FileSystem.get(conf);
 +    Path testDir = FSUtils.getRootDir(conf);
 +    Path outputDir1 = new Path(new Path(testDir, TABLE),
 +        FAMILY1);
 +    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
 +    StoreFile.Writer writer1 = new StoreFile.WriterBuilder(conf, cacheConf, fs)
 +        .withOutputDir(outputDir1).withFileContext(meta).build();
 +    MobTestUtil.writeStoreFile(writer1, caseName);
 +    CachedMobFile cachedMobFile1 = CachedMobFile.create(fs, writer1.getPath(), conf, cacheConf);
 +    Path outputDir2 = new Path(new Path(testDir, TABLE),
 +        FAMILY2);
 +    StoreFile.Writer writer2 = new StoreFile.WriterBuilder(conf, cacheConf, fs)
 +    .withOutputDir(outputDir2)
 +    .withFileContext(meta)
 +    .build();
 +    MobTestUtil.writeStoreFile(writer2, caseName);
 +    CachedMobFile cachedMobFile2 = CachedMobFile.create(fs, writer2.getPath(), conf, cacheConf);
 +    cachedMobFile1.access(1);
 +    cachedMobFile2.access(2);
 +    Assert.assertEquals(cachedMobFile1.compareTo(cachedMobFile2), 1);
 +    Assert.assertEquals(cachedMobFile2.compareTo(cachedMobFile1), -1);
 +    Assert.assertEquals(cachedMobFile1.compareTo(cachedMobFile1), 0);
 +  }
 +
 +  @Test
 +  public void testReadKeyValue() throws Exception {
 +    FileSystem fs = FileSystem.get(conf);
 +    Path testDir = FSUtils.getRootDir(conf);
 +    Path outputDir = new Path(new Path(testDir, TABLE), "familyname");
 +    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
 +    StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
 +        .withOutputDir(outputDir).withFileContext(meta).build();
 +    String caseName = getName();
 +    MobTestUtil.writeStoreFile(writer, caseName);
 +    CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
 +    byte[] family = Bytes.toBytes(caseName);
 +    byte[] qualify = Bytes.toBytes(caseName);
 +    // Test the start key
 +    byte[] startKey = Bytes.toBytes("aa");  // The start key bytes
 +    KeyValue expectedKey =
 +        new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
 +    KeyValue seekKey = expectedKey.createKeyOnly(false);
-     KeyValue kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
-     MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
++    Cell cell = cachedMobFile.readCell(seekKey, false);
++    MobTestUtil.assertCellEquals(expectedKey, cell);
 +
 +    // Test the end key
 +    byte[] endKey = Bytes.toBytes("zz");  // The end key bytes
 +    expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
 +    seekKey = expectedKey.createKeyOnly(false);
-     kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
-     MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
++    cell = cachedMobFile.readCell(seekKey, false);
++    MobTestUtil.assertCellEquals(expectedKey, cell);
 +
 +    // Test the random key
 +    byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
 +    expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
 +    seekKey = expectedKey.createKeyOnly(false);
-     kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
-     MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
++    cell = cachedMobFile.readCell(seekKey, false);
++    MobTestUtil.assertCellEquals(expectedKey, cell);
 +
 +    // Test the key which is less than the start key
 +    byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
 +    expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
 +    seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
-     kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
-     MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
++    cell = cachedMobFile.readCell(seekKey, false);
++    MobTestUtil.assertCellEquals(expectedKey, cell);
 +
 +    // Test the key which is more than the end key
 +    byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
 +    seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
-     kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
-     Assert.assertNull(kv);
++    cell = cachedMobFile.readCell(seekKey, false);
++    Assert.assertNull(cell);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
index 5e3a695,0000000..b91d4d1
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
@@@ -1,193 -1,0 +1,190 @@@
 +/**
 +*
 +* Licensed to the Apache Software Foundation (ASF) under one
 +* or more contributor license agreements.  See the NOTICE file
 +* distributed with this work for additional information
 +* regarding copyright ownership.  The ASF licenses this file
 +* to you under the Apache License, Version 2.0 (the
 +* "License"); you may not use this file except in compliance
 +* with the License.  You may obtain a copy of the License at
 +*
 +*     http://www.apache.org/licenses/LICENSE-2.0
 +*
 +* Unless required by applicable law or agreed to in writing, software
 +* distributed under the License is distributed on an "AS IS" BASIS,
 +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +* See the License for the specific language governing permissions and
 +* limitations under the License.
 +*/
 +package org.apache.hadoop.hbase.mob;
 +
 +import java.io.IOException;
 +import java.util.List;
 +
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.MasterNotRunningException;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.ZooKeeperConnectionException;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.ResultScanner;
- import org.apache.hadoop.hbase.client.Scan;
++import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(LargeTests.class)
 +public class TestDefaultMobStoreFlusher {
 +
 + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 + private final static byte [] row1 = Bytes.toBytes("row1");
 + private final static byte [] row2 = Bytes.toBytes("row2");
 + private final static byte [] family = Bytes.toBytes("family");
 + private final static byte [] qf1 = Bytes.toBytes("qf1");
 + private final static byte [] qf2 = Bytes.toBytes("qf2");
 + private final static byte [] value1 = Bytes.toBytes("value1");
 + private final static byte [] value2 = Bytes.toBytes("value2");
 +
 + @BeforeClass
 + public static void setUpBeforeClass() throws Exception {
 +   TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +   TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +
 +   TEST_UTIL.startMiniCluster(1);
 + }
 +
 + @AfterClass
 + public static void tearDownAfterClass() throws Exception {
 +   TEST_UTIL.shutdownMiniCluster();
 + }
 +
 + @Test
 + public void testFlushNonMobFile() throws InterruptedException {
 +   String TN = "testFlushNonMobFile";
-    HTable table = null;
++   TableName tn = TableName.valueOf(TN);
++   Table table = null;
 +   HBaseAdmin admin = null;
 +
 +   try {
-      HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TN));
++     HTableDescriptor desc = new HTableDescriptor(tn);
 +     HColumnDescriptor hcd = new HColumnDescriptor(family);
 +     hcd.setMaxVersions(4);
 +     desc.addFamily(hcd);
 +
-      admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
++     admin = TEST_UTIL.getHBaseAdmin();
 +     admin.createTable(desc);
-      table = new HTable(TEST_UTIL.getConfiguration(), TN);
++     table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
++             .getTable(TableName.valueOf(TN));
 +
 +     //Put data
 +     Put put0 = new Put(row1);
-      put0.add(family, qf1, 1, value1);
++     put0.addColumn(family, qf1, 1, value1);
 +     table.put(put0);
 +
 +     //Put more data
 +     Put put1 = new Put(row2);
-      put1.add(family, qf2, 1, value2);
++     put1.addColumn(family, qf2, 1, value2);
 +     table.put(put1);
 +
 +     //Flush
-      table.flushCommits();
-      admin.flush(TN);
++     admin.flush(tn);
 +
 +     Scan scan = new Scan();
 +     scan.addColumn(family, qf1);
 +     scan.setMaxVersions(4);
 +     ResultScanner scanner = table.getScanner(scan);
 +
 +     //Compare
 +     Result result = scanner.next();
 +     int size = 0;
 +     while (result != null) {
 +       size++;
 +       List<Cell> cells = result.getColumnCells(family, qf1);
 +       // Verify the cell size
 +       Assert.assertEquals(1, cells.size());
 +       // Verify the value
 +       Assert.assertEquals(Bytes.toString(value1),
 +           Bytes.toString(CellUtil.cloneValue(cells.get(0))));
 +       result = scanner.next();
 +     }
 +     scanner.close();
 +     Assert.assertEquals(1, size);
 +     admin.close();
 +   } catch (MasterNotRunningException e1) {
 +     e1.printStackTrace();
 +   } catch (ZooKeeperConnectionException e2) {
 +     e2.printStackTrace();
 +   } catch (IOException e3) {
 +     e3.printStackTrace();
 +   }
 + }
 +
 + @Test
 + public void testFlushMobFile() throws InterruptedException {
 +   String TN = "testFlushMobFile";
-    HTable table = null;
-    HBaseAdmin admin = null;
++   TableName tn = TableName.valueOf(TN);
++   Table table = null;
++   Admin admin = null;
 +
 +   try {
-      HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TN));
++     HTableDescriptor desc = new HTableDescriptor(tn);
 +     HColumnDescriptor hcd = new HColumnDescriptor(family);
 +     hcd.setMobEnabled(true);
 +     hcd.setMobThreshold(3L);
 +     hcd.setMaxVersions(4);
 +     desc.addFamily(hcd);
 +
-      admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
++     Connection c = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
++     admin = c.getAdmin();
 +     admin.createTable(desc);
-      table = new HTable(TEST_UTIL.getConfiguration(), TN);
++     table = c.getTable(TableName.valueOf(TN));
 +
 +     //put data
 +     Put put0 = new Put(row1);
-      put0.add(family, qf1, 1, value1);
++     put0.addColumn(family, qf1, 1, value1);
 +     table.put(put0);
 +
 +     //put more data
 +     Put put1 = new Put(row2);
-      put1.add(family, qf2, 1, value2);
++     put1.addColumn(family, qf2, 1, value2);
 +     table.put(put1);
 +
 +     //flush
-      table.flushCommits();
-      admin.flush(TN);
++     admin.flush(tn);
 +
 +     //Scan
 +     Scan scan = new Scan();
 +     scan.addColumn(family, qf1);
 +     scan.setMaxVersions(4);
 +     ResultScanner scanner = table.getScanner(scan);
 +
 +     //Compare
 +     Result result = scanner.next();
 +     int size = 0;
 +     while (result != null) {
 +       size++;
 +       List<Cell> cells = result.getColumnCells(family, qf1);
 +       // Verify the the cell size
 +       Assert.assertEquals(1, cells.size());
 +       // Verify the value
 +       Assert.assertEquals(Bytes.toString(value1),
 +           Bytes.toString(CellUtil.cloneValue(cells.get(0))));
 +       result = scanner.next();
 +     }
 +     scanner.close();
 +     Assert.assertEquals(1, size);
 +     admin.close();
 +   } catch (MasterNotRunningException e1) {
 +     e1.printStackTrace();
 +   } catch (ZooKeeperConnectionException e2) {
 +     e2.printStackTrace();
 +   } catch (IOException e3) {
 +     e3.printStackTrace();
 +   }
 + }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
index f16fa20,0000000..dfaeca6
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
@@@ -1,179 -1,0 +1,175 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.util.Random;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Admin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
- import org.apache.hadoop.hbase.mob.MobUtils;
++import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.util.ToolRunner;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(MediumTests.class)
 +public class TestExpiredMobFileCleaner {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static TableName tableName = TableName.valueOf("TestExpiredMobFileCleaner");
 +  private final static String family = "family";
 +  private final static byte[] row1 = Bytes.toBytes("row1");
 +  private final static byte[] row2 = Bytes.toBytes("row2");
 +  private final static byte[] qf = Bytes.toBytes("qf");
 +
-   private static HTable table;
++  private static BufferedMutator table;
 +  private static Admin admin;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +
 +    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +
 +  }
 +
 +  @Before
 +  public void setUp() throws Exception {
 +    TEST_UTIL.startMiniCluster(1);
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    admin.disableTable(tableName);
 +    admin.deleteTable(tableName);
 +    admin.close();
 +    TEST_UTIL.shutdownMiniCluster();
 +    TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true);
 +  }
 +
 +  private void init() throws Exception {
 +    HTableDescriptor desc = new HTableDescriptor(tableName);
 +    HColumnDescriptor hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    hcd.setMaxVersions(4);
 +    desc.addFamily(hcd);
 +
 +    admin = TEST_UTIL.getHBaseAdmin();
 +    admin.createTable(desc);
-     table = new HTable(TEST_UTIL.getConfiguration(), tableName);
-     table.setAutoFlush(false, false);
++    table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
++            .getBufferedMutator(tableName);
 +  }
 +
 +  private void modifyColumnExpiryDays(int expireDays) throws Exception {
 +    HColumnDescriptor hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    // change ttl as expire days to make some row expired
 +    int timeToLive = expireDays * secondsOfDay();
 +    hcd.setTimeToLive(timeToLive);
 +
 +    admin.modifyColumn(tableName, hcd);
 +  }
 +
-   private void putKVAndFlush(HTable table, byte[] row, byte[] value, long ts)
++  private void putKVAndFlush(BufferedMutator table, byte[] row, byte[] value, long ts)
 +      throws Exception {
 +
 +    Put put = new Put(row, ts);
-     put.add(Bytes.toBytes(family), qf, value);
-     table.put(put);
++    put.addColumn(Bytes.toBytes(family), qf, value);
++    table.mutate(put);
 +
-     table.flushCommits();
++    table.flush();
 +    admin.flush(tableName);
 +  }
 +
 +  /**
 +   * Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2 days.
 +   * Verifies that the 3 day old hfile is removed but the 1 day one is still present
 +   * after the expiry based cleaner is run.
 +   */
 +  @Test
 +  public void testCleaner() throws Exception {
 +    init();
 +
 +    Path mobDirPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
 +
 +    byte[] dummyData = makeDummyData(600);
 +    long ts = System.currentTimeMillis() - 3 * secondsOfDay() * 1000; // 3 days before
 +    putKVAndFlush(table, row1, dummyData, ts);
 +    FileStatus[] firstFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
 +    //the first mob file
 +    assertEquals("Before cleanup without delay 1", 1, firstFiles.length);
 +    String firstFile = firstFiles[0].getPath().getName();
 +
 +    ts = System.currentTimeMillis() - 1 * secondsOfDay() * 1000; // 1 day before
 +    putKVAndFlush(table, row2, dummyData, ts);
 +    FileStatus[] secondFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
 +    //now there are 2 mob files
 +    assertEquals("Before cleanup without delay 2", 2, secondFiles.length);
 +    String f1 = secondFiles[0].getPath().getName();
 +    String f2 = secondFiles[1].getPath().getName();
 +    String secondFile = f1.equals(firstFile) ? f2 : f1;
 +
 +    modifyColumnExpiryDays(2); // ttl = 2, make the first row expired
 +
 +    //run the cleaner
 +    String[] args = new String[2];
 +    args[0] = tableName.getNameAsString();
 +    args[1] = family;
 +    ToolRunner.run(TEST_UTIL.getConfiguration(), new ExpiredMobFileCleaner(), args);
 +
 +    FileStatus[] filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
 +    String lastFile = filesAfterClean[0].getPath().getName();
 +    //the first mob fie is removed
 +    assertEquals("After cleanup without delay 1", 1, filesAfterClean.length);
 +    assertEquals("After cleanup without delay 2", secondFile, lastFile);
 +  }
 +
 +  private Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
 +    Path p = new Path(MobUtils.getMobRegionPath(conf, tableName), familyName);
 +    return p;
 +  }
 +
 +  private int secondsOfDay() {
 +    return 24 * 3600;
 +  }
 +
 +  private byte[] makeDummyData(int size) {
 +    byte [] dummyData = new byte[size];
 +    new Random().nextBytes(dummyData);
 +    return dummyData;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java
index 055eac3,0000000..15aa7d4
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java
@@@ -1,141 -1,0 +1,135 @@@
 +/**
 +*
 +* Licensed to the Apache Software Foundation (ASF) under one
 +* or more contributor license agreements.  See the NOTICE file
 +* distributed with this work for additional information
 +* regarding copyright ownership.  The ASF licenses this file
 +* to you under the Apache License, Version 2.0 (the
 +* "License"); you may not use this file except in compliance
 +* with the License.  You may obtain a copy of the License at
 +*
 +*     http://www.apache.org/licenses/LICENSE-2.0
 +*
 +* Unless required by applicable law or agreed to in writing, software
 +* distributed under the License is distributed on an "AS IS" BASIS,
 +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +* See the License for the specific language governing permissions and
 +* limitations under the License.
 +*/
 +package org.apache.hadoop.hbase.mob;
 +
 +import java.util.List;
 +import java.util.Random;
 +
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.ResultScanner;
- import org.apache.hadoop.hbase.client.Scan;
++import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 +import org.apache.hadoop.hbase.testclassification.MediumTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(MediumTests.class)
 +public class TestMobDataBlockEncoding {
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static byte [] row1 = Bytes.toBytes("row1");
 +  private final static byte [] family = Bytes.toBytes("family");
 +  private final static byte [] qf1 = Bytes.toBytes("qualifier1");
 +  private final static byte [] qf2 = Bytes.toBytes("qualifier2");
 +  protected final byte[] qf3 = Bytes.toBytes("qualifier3");
-   private static HTable table;
++  private static Table table;
 +  private static HBaseAdmin admin;
 +  private static HColumnDescriptor hcd;
 +  private static HTableDescriptor desc;
 +  private static Random random = new Random();
 +  private static long defaultThreshold = 10;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +
 +    TEST_UTIL.startMiniCluster(1);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  public void setUp(long threshold, String TN, DataBlockEncoding encoding)
 +      throws Exception {
 +    desc = new HTableDescriptor(TableName.valueOf(TN));
 +    hcd = new HColumnDescriptor(family);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(threshold);
 +    hcd.setMaxVersions(4);
 +    hcd.setDataBlockEncoding(encoding);
 +    desc.addFamily(hcd);
-     admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
++    admin = TEST_UTIL.getHBaseAdmin();
 +    admin.createTable(desc);
-     table = new HTable(TEST_UTIL.getConfiguration(), TN);
++    table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
++            .getTable(TableName.valueOf(TN));
 +  }
 +
 +  /**
 +   * Generate the mob value.
 +   *
 +   * @param size the size of the value
 +   * @return the mob value generated
 +   */
 +  private static byte[] generateMobValue(int size) {
 +    byte[] mobVal = new byte[size];
 +    random.nextBytes(mobVal);
 +    return mobVal;
 +  }
 +
 +  @Test
 +  public void testDataBlockEncoding() throws Exception {
 +    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
 +      testDataBlockEncoding(encoding);
 +    }
 +  }
 +
 +  public void testDataBlockEncoding(DataBlockEncoding encoding) throws Exception {
 +    String TN = "testDataBlockEncoding" + encoding;
 +    setUp(defaultThreshold, TN, encoding);
 +    long ts1 = System.currentTimeMillis();
 +    long ts2 = ts1 + 1;
 +    long ts3 = ts1 + 2;
 +    byte[] value = generateMobValue((int) defaultThreshold + 1);
 +
 +    Put put1 = new Put(row1);
-     put1.add(family, qf1, ts3, value);
-     put1.add(family, qf2, ts2, value);
-     put1.add(family, qf3, ts1, value);
++    put1.addColumn(family, qf1, ts3, value);
++    put1.addColumn(family, qf2, ts2, value);
++    put1.addColumn(family, qf3, ts1, value);
 +    table.put(put1);
- 
-     table.flushCommits();
-     admin.flush(TN);
++    admin.flush(TableName.valueOf(TN));
 +
 +    Scan scan = new Scan();
 +    scan.setMaxVersions(4);
 +
 +    ResultScanner results = table.getScanner(scan);
 +    int count = 0;
 +    for (Result res : results) {
 +      List<Cell> cells = res.listCells();
 +      for(Cell cell : cells) {
 +        // Verify the value
 +        Assert.assertEquals(Bytes.toString(value),
 +            Bytes.toString(CellUtil.cloneValue(cell)));
 +        count++;
 +      }
 +    }
 +    results.close();
 +    Assert.assertEquals(3, count);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
index 01050ae,0000000..d05da24
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
@@@ -1,124 -1,0 +1,124 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob;
 +
 +import junit.framework.TestCase;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValue.Type;
- import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.io.hfile.HFileContext;
 +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 +import org.apache.hadoop.hbase.regionserver.BloomType;
 +import org.apache.hadoop.hbase.regionserver.StoreFile;
 +import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 +import org.apache.hadoop.hbase.testclassification.SmallTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(SmallTests.class)
 +public class TestMobFile extends TestCase {
 +  static final Log LOG = LogFactory.getLog(TestMobFile.class);
 +  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private Configuration conf = TEST_UTIL.getConfiguration();
 +  private CacheConfig cacheConf =  new CacheConfig(conf);
 +  private final String TABLE = "tableName";
 +  private final String FAMILY = "familyName";
 +
 +  @Test
 +  public void testReadKeyValue() throws Exception {
 +    FileSystem fs = FileSystem.get(conf);
 +	Path testDir = FSUtils.getRootDir(conf);
 +    Path outputDir = new Path(new Path(testDir, TABLE), FAMILY);
 +    HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
 +    StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
 +            .withOutputDir(outputDir)
 +            .withFileContext(meta)
 +            .build();
 +    String caseName = getName();
 +    MobTestUtil.writeStoreFile(writer, caseName);
 +
 +    MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(),
 +        conf, cacheConf, BloomType.NONE));
 +    byte[] family = Bytes.toBytes(caseName);
 +    byte[] qualify = Bytes.toBytes(caseName);
 +
 +    // Test the start key
 +    byte[] startKey = Bytes.toBytes("aa");  // The start key bytes
 +    KeyValue expectedKey =
 +        new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
 +    KeyValue seekKey = expectedKey.createKeyOnly(false);
-     KeyValue kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
-     MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
++    Cell cell = mobFile.readCell(seekKey, false);
++    MobTestUtil.assertCellEquals(expectedKey, cell);
 +
 +    // Test the end key
 +    byte[] endKey = Bytes.toBytes("zz");  // The end key bytes
 +    expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
 +    seekKey = expectedKey.createKeyOnly(false);
-     kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
-     MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
++    cell = mobFile.readCell(seekKey, false);
++    MobTestUtil.assertCellEquals(expectedKey, cell);
 +
 +    // Test the random key
 +    byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
 +    expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
 +    seekKey = expectedKey.createKeyOnly(false);
-     kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
-     MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
++    cell = mobFile.readCell(seekKey, false);
++    MobTestUtil.assertCellEquals(expectedKey, cell);
 +
 +    // Test the key which is less than the start key
 +    byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
 +    expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
 +    seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
-     kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
-     MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
++    cell = mobFile.readCell(seekKey, false);
++    MobTestUtil.assertCellEquals(expectedKey, cell);
 +
 +    // Test the key which is more than the end key
 +    byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
 +    seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
-     kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
-     assertNull(kv);
++    cell = mobFile.readCell(seekKey, false);
++    assertNull(cell);
 +  }
 +
 +  @Test
 +  public void testGetScanner() throws Exception {
 +    FileSystem fs = FileSystem.get(conf);
 +    Path testDir = FSUtils.getRootDir(conf);
 +    Path outputDir = new Path(new Path(testDir, TABLE), FAMILY);
 +    HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
 +    StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
 +            .withOutputDir(outputDir)
 +            .withFileContext(meta)
 +            .build();
 +    MobTestUtil.writeStoreFile(writer, getName());
 +
 +    MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(),
 +        conf, cacheConf, BloomType.NONE));
 +    assertNotNull(mobFile.getScanner());
 +    assertTrue(mobFile.getScanner() instanceof StoreFileScanner);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java
index 1a809a1,0000000..95fa1b9
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java
@@@ -1,207 -1,0 +1,202 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob;
 +
 +import java.io.IOException;
 +import java.util.Date;
 +
 +import junit.framework.TestCase;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseConfiguration;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HRegionInfo;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.regionserver.HMobStore;
 +import org.apache.hadoop.hbase.regionserver.HRegion;
 +import org.apache.hadoop.hbase.regionserver.StoreFile;
- import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 +import org.apache.hadoop.hbase.testclassification.SmallTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(SmallTests.class)
 +public class TestMobFileCache extends TestCase {
 +  static final Log LOG = LogFactory.getLog(TestMobFileCache.class);
 +  private HBaseTestingUtility UTIL;
 +  private HRegion region;
 +  private Configuration conf;
 +  private MobCacheConfig mobCacheConf;
 +  private MobFileCache mobFileCache;
 +  private Date currentDate = new Date();
-   private final String TEST_CACHE_SIZE = "2";
-   private final int EXPECTED_CACHE_SIZE_ZERO = 0;
-   private final int EXPECTED_CACHE_SIZE_ONE = 1;
-   private final int EXPECTED_CACHE_SIZE_TWO = 2;
-   private final int EXPECTED_CACHE_SIZE_THREE = 3;
-   private final long EXPECTED_REFERENCE_ONE = 1;
-   private final long EXPECTED_REFERENCE_TWO = 2;
- 
-   private final String TABLE = "tableName";
-   private final String FAMILY1 = "family1";
-   private final String FAMILY2 = "family2";
-   private final String FAMILY3 = "family3";
- 
-   private final byte[] ROW = Bytes.toBytes("row");
-   private final byte[] ROW2 = Bytes.toBytes("row2");
-   private final byte[] VALUE = Bytes.toBytes("value");
-   private final byte[] VALUE2 = Bytes.toBytes("value2");
-   private final byte[] QF1 = Bytes.toBytes("qf1");
-   private final byte[] QF2 = Bytes.toBytes("qf2");
-   private final byte[] QF3 = Bytes.toBytes("qf3");
++  private static final String TEST_CACHE_SIZE = "2";
++  private static final int EXPECTED_CACHE_SIZE_ZERO = 0;
++  private static final int EXPECTED_CACHE_SIZE_ONE = 1;
++  private static final int EXPECTED_CACHE_SIZE_TWO = 2;
++  private static final int EXPECTED_CACHE_SIZE_THREE = 3;
++  private static final long EXPECTED_REFERENCE_ONE = 1;
++  private static final long EXPECTED_REFERENCE_TWO = 2;
++
++  private static final String TABLE = "tableName";
++  private static final String FAMILY1 = "family1";
++  private static final String FAMILY2 = "family2";
++  private static final String FAMILY3 = "family3";
++
++  private static final byte[] ROW = Bytes.toBytes("row");
++  private static final byte[] ROW2 = Bytes.toBytes("row2");
++  private static final byte[] VALUE = Bytes.toBytes("value");
++  private static final byte[] VALUE2 = Bytes.toBytes("value2");
++  private static final byte[] QF1 = Bytes.toBytes("qf1");
++  private static final byte[] QF2 = Bytes.toBytes("qf2");
++  private static final byte[] QF3 = Bytes.toBytes("qf3");
 +
 +  @Override
 +  public void setUp() throws Exception {
 +    UTIL = HBaseTestingUtility.createLocalHTU();
 +    conf = UTIL.getConfiguration();
 +    HTableDescriptor htd = UTIL.createTableDescriptor("testMobFileCache");
 +    HColumnDescriptor hcd1 = new HColumnDescriptor(FAMILY1);
 +    hcd1.setMobEnabled(true);
 +    hcd1.setMobThreshold(0);
 +    HColumnDescriptor hcd2 = new HColumnDescriptor(FAMILY2);
 +    hcd2.setMobEnabled(true);
 +    hcd2.setMobThreshold(0);
 +    HColumnDescriptor hcd3 = new HColumnDescriptor(FAMILY3);
 +    hcd3.setMobEnabled(true);
 +    hcd3.setMobThreshold(0);
 +    htd.addFamily(hcd1);
 +    htd.addFamily(hcd2);
 +    htd.addFamily(hcd3);
 +    region = UTIL.createLocalHRegion(htd, null, null);
 +  }
 +
 +  @Override
 +  protected void tearDown() throws Exception {
 +    region.close();
 +    region.getFilesystem().delete(UTIL.getDataTestDir(), true);
 +  }
 +
 +  /**
 +   * Create the mob store file.
-    * @param family
 +   */
 +  private Path createMobStoreFile(String family) throws IOException {
 +    return createMobStoreFile(HBaseConfiguration.create(), family);
 +  }
 +
 +  /**
 +   * Create the mob store file
-    * @param conf
-    * @param family
 +   */
 +  private Path createMobStoreFile(Configuration conf, String family) throws IOException {
 +    HColumnDescriptor hcd = new HColumnDescriptor(family);
 +    hcd.setMaxVersions(4);
 +    hcd.setMobEnabled(true);
 +    mobCacheConf = new MobCacheConfig(conf, hcd);
-     return createMobStoreFile(conf, hcd);
++    return createMobStoreFile(hcd);
 +  }
 +
 +  /**
 +   * Create the mob store file
-    * @param conf
-    * @param hcd
 +   */
-   private Path createMobStoreFile(Configuration conf, HColumnDescriptor hcd)
++  private Path createMobStoreFile(HColumnDescriptor hcd)
 +      throws IOException {
 +    // Setting up a Store
-     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
++    TableName tn = TableName.valueOf(TABLE);
++    HTableDescriptor htd = new HTableDescriptor(tn);
 +    htd.addFamily(hcd);
 +    HMobStore mobStore = (HMobStore) region.getStore(hcd.getName());
 +    KeyValue key1 = new KeyValue(ROW, hcd.getName(), QF1, 1, VALUE);
 +    KeyValue key2 = new KeyValue(ROW, hcd.getName(), QF2, 1, VALUE);
 +    KeyValue key3 = new KeyValue(ROW2, hcd.getName(), QF3, 1, VALUE2);
 +    KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
 +    int maxKeyCount = keys.length;
-     HRegionInfo regionInfo = new HRegionInfo();
++    HRegionInfo regionInfo = new HRegionInfo(tn);
 +    StoreFile.Writer mobWriter = mobStore.createWriterInTmp(currentDate,
 +        maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey());
 +    Path mobFilePath = mobWriter.getPath();
 +    String fileName = mobFilePath.getName();
 +    mobWriter.append(key1);
 +    mobWriter.append(key2);
 +    mobWriter.append(key3);
 +    mobWriter.close();
 +    String targetPathName = MobUtils.formatDate(currentDate);
 +    Path targetPath = new Path(mobStore.getPath(), targetPathName);
 +    mobStore.commitFile(mobFilePath, targetPath);
 +    return new Path(targetPath, fileName);
 +  }
 +
 +  @Test
 +  public void testMobFileCache() throws Exception {
 +    FileSystem fs = FileSystem.get(conf);
 +    conf.set(MobConstants.MOB_FILE_CACHE_SIZE_KEY, TEST_CACHE_SIZE);
 +    mobFileCache = new MobFileCache(conf);
 +    Path file1Path = createMobStoreFile(FAMILY1);
 +    Path file2Path = createMobStoreFile(FAMILY2);
 +    Path file3Path = createMobStoreFile(FAMILY3);
 +
 +    // Before open one file by the MobFileCache
 +    assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize());
 +    // Open one file by the MobFileCache
 +    CachedMobFile cachedMobFile1 = (CachedMobFile) mobFileCache.openFile(
 +        fs, file1Path, mobCacheConf);
 +    assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
 +    assertNotNull(cachedMobFile1);
 +    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
 +
 +    // The evict is also managed by a schedule thread pool.
 +    // And its check period is set as 3600 seconds by default.
 +    // This evict should get the lock at the most time
 +    mobFileCache.evict();  // Cache not full, evict it
 +    assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
 +    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
 +
 +    mobFileCache.evictFile(file1Path.getName());  // Evict one file
 +    assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize());
 +    assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile1.getReferenceCount());
 +
 +    cachedMobFile1.close();  // Close the cached mob file
 +
 +    // Reopen three cached file
 +    cachedMobFile1 = (CachedMobFile) mobFileCache.openFile(
 +        fs, file1Path, mobCacheConf);
 +    assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
 +    CachedMobFile cachedMobFile2 = (CachedMobFile) mobFileCache.openFile(
 +        fs, file2Path, mobCacheConf);
 +    assertEquals(EXPECTED_CACHE_SIZE_TWO, mobFileCache.getCacheSize());
 +    CachedMobFile cachedMobFile3 = (CachedMobFile) mobFileCache.openFile(
 +        fs, file3Path, mobCacheConf);
 +    // Before the evict
-     // Evict the cache, should clost the first file 1
++    // Evict the cache, should close the first file 1
 +    assertEquals(EXPECTED_CACHE_SIZE_THREE, mobFileCache.getCacheSize());
 +    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
 +    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile2.getReferenceCount());
 +    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile3.getReferenceCount());
 +    mobFileCache.evict();
 +    assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
 +    assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile1.getReferenceCount());
 +    assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile2.getReferenceCount());
 +    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile3.getReferenceCount());
 +  }
 +}