You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/09/04 14:43:54 UTC

[1/2] HBASE-11643 Read and write MOB in HBase (Jingcheng Du)

Repository: hbase
Updated Branches:
  refs/heads/hbase-11339 [created] 5c14f749b


http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
new file mode 100644
index 0000000..5e28cd9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+
+public class MobTestUtil {
+  protected static final char FIRST_CHAR = 'a';
+  protected static final char LAST_CHAR = 'z';
+
+  protected static String generateRandomString(int demoLength) {
+    String base = "abcdefghijklmnopqrstuvwxyz";
+    Random random = new Random();
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < demoLength; i++) {
+      int number = random.nextInt(base.length());
+      sb.append(base.charAt(number));
+    }
+    return sb.toString();
+  }
+  protected static void writeStoreFile(final StoreFile.Writer writer, String caseName)
+      throws IOException {
+    writeStoreFile(writer, Bytes.toBytes(caseName), Bytes.toBytes(caseName));
+  }
+
+  /*
+   * Writes HStoreKey and ImmutableBytes data to passed writer and then closes
+   * it.
+   *
+   * @param writer
+   *
+   * @throws IOException
+   */
+  private static void writeStoreFile(final StoreFile.Writer writer, byte[] fam,
+      byte[] qualifier) throws IOException {
+    long now = System.currentTimeMillis();
+    try {
+      for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
+        for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
+          byte[] b = new byte[] { (byte) d, (byte) e };
+          writer.append(new KeyValue(b, fam, qualifier, now, b));
+        }
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+  /**
+   * Compare two KeyValue only for their row family qualifier value
+   */
+  public static void assertKeyValuesEquals(KeyValue firstKeyValue,
+	      KeyValue secondKeyValue) {
+		    Assert.assertEquals(Bytes.toString(CellUtil.cloneRow(firstKeyValue)),
+	            Bytes.toString(CellUtil.cloneRow(secondKeyValue)));
+		    Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(firstKeyValue)),
+	            Bytes.toString(CellUtil.cloneFamily(secondKeyValue)));
+		    Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(firstKeyValue)),
+	            Bytes.toString(CellUtil.cloneQualifier(secondKeyValue)));
+		    Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(firstKeyValue)),
+	            Bytes.toString(CellUtil.cloneValue(secondKeyValue)));
+	  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
new file mode 100644
index 0000000..b39dd2a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
@@ -0,0 +1,154 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestCachedMobFile extends TestCase{
+  static final Log LOG = LogFactory.getLog(TestCachedMobFile.class);
+  private Configuration conf = HBaseConfiguration.create();
+  private CacheConfig cacheConf = new CacheConfig(conf);
+  private final String TABLE = "tableName";
+  private final String FAMILY = "familyName";
+  private final String FAMILY1 = "familyName1";
+  private final String FAMILY2 = "familyName2";
+  private final long EXPECTED_REFERENCE_ZERO = 0;
+  private final long EXPECTED_REFERENCE_ONE = 1;
+  private final long EXPECTED_REFERENCE_TWO = 2;
+
+  @Test
+  public void testOpenClose() throws Exception {
+    String caseName = getName();
+    FileSystem fs = FileSystem.get(conf);
+    Path testDir = FSUtils.getRootDir(conf);
+    Path outputDir = new Path(new Path(testDir, TABLE),
+        FAMILY);
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
+    StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+        .withOutputDir(outputDir).withFileContext(meta).build();
+    MobTestUtil.writeStoreFile(writer, caseName);
+    CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
+    Assert.assertEquals(EXPECTED_REFERENCE_ZERO, cachedMobFile.getReferenceCount());
+    cachedMobFile.open();
+    Assert.assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile.getReferenceCount());
+    cachedMobFile.open();
+    Assert.assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile.getReferenceCount());
+    cachedMobFile.close();
+    Assert.assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile.getReferenceCount());
+    cachedMobFile.close();
+    Assert.assertEquals(EXPECTED_REFERENCE_ZERO, cachedMobFile.getReferenceCount());
+  }
+
+  @Test
+  public void testCompare() throws Exception {
+    String caseName = getName();
+    FileSystem fs = FileSystem.get(conf);
+    Path testDir = FSUtils.getRootDir(conf);
+    Path outputDir1 = new Path(new Path(testDir, TABLE),
+        FAMILY1);
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+    StoreFile.Writer writer1 = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+        .withOutputDir(outputDir1).withFileContext(meta).build();
+    MobTestUtil.writeStoreFile(writer1, caseName);
+    CachedMobFile cachedMobFile1 = CachedMobFile.create(fs, writer1.getPath(), conf, cacheConf);
+    Path outputDir2 = new Path(new Path(testDir, TABLE),
+        FAMILY2);
+    StoreFile.Writer writer2 = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+    .withOutputDir(outputDir2)
+    .withFileContext(meta)
+    .build();
+    MobTestUtil.writeStoreFile(writer2, caseName);
+    CachedMobFile cachedMobFile2 = CachedMobFile.create(fs, writer2.getPath(), conf, cacheConf);
+    cachedMobFile1.access(1);
+    cachedMobFile2.access(2);
+    Assert.assertEquals(cachedMobFile1.compareTo(cachedMobFile2), 1);
+    Assert.assertEquals(cachedMobFile2.compareTo(cachedMobFile1), -1);
+    Assert.assertEquals(cachedMobFile1.compareTo(cachedMobFile1), 0);
+  }
+
+  @Test
+  public void testReadKeyValue() throws Exception {
+    FileSystem fs = FileSystem.get(conf);
+    Path testDir = FSUtils.getRootDir(conf);
+    Path outputDir = new Path(new Path(testDir, TABLE), "familyname");
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+    StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+        .withOutputDir(outputDir).withFileContext(meta).build();
+    String caseName = getName();
+    MobTestUtil.writeStoreFile(writer, caseName);
+    CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
+    byte[] family = Bytes.toBytes(caseName);
+    byte[] qualify = Bytes.toBytes(caseName);
+    // Test the start key
+    byte[] startKey = Bytes.toBytes("aa");  // The start key bytes
+    KeyValue expectedKey =
+        new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
+    KeyValue seekKey = expectedKey.createKeyOnly(false);
+    KeyValue kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
+    MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+    // Test the end key
+    byte[] endKey = Bytes.toBytes("zz");  // The end key bytes
+    expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
+    seekKey = expectedKey.createKeyOnly(false);
+    kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
+    MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+    // Test the random key
+    byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
+    expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
+    seekKey = expectedKey.createKeyOnly(false);
+    kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
+    MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+    // Test the key which is less than the start key
+    byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
+    expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
+    seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
+    kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
+    MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+    // Test the key which is more than the end key
+    byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
+    seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
+    kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
+    Assert.assertNull(kv);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
new file mode 100644
index 0000000..5d85718
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
@@ -0,0 +1,193 @@
+/**
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestDefaultMobStoreFlusher {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static byte [] row1 = Bytes.toBytes("row1");
+ private final static byte [] row2 = Bytes.toBytes("row2");
+ private final static byte [] family = Bytes.toBytes("family");
+ private final static byte [] qf1 = Bytes.toBytes("qf1");
+ private final static byte [] qf2 = Bytes.toBytes("qf2");
+ private final static byte [] value1 = Bytes.toBytes("value1");
+ private final static byte [] value2 = Bytes.toBytes("value2");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+   TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+   TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+   TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+   TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testFlushNonMobFile() throws InterruptedException {
+   String TN = "testFlushNonMobFile";
+   HTable table = null;
+   HBaseAdmin admin = null;
+
+   try {
+     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TN));
+     HColumnDescriptor hcd = new HColumnDescriptor(family);
+     hcd.setMaxVersions(4);
+     desc.addFamily(hcd);
+
+     admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+     admin.createTable(desc);
+     table = new HTable(TEST_UTIL.getConfiguration(), TN);
+
+     //Put data
+     Put put0 = new Put(row1);
+     put0.add(family, qf1, 1, value1);
+     table.put(put0);
+
+     //Put more data
+     Put put1 = new Put(row2);
+     put1.add(family, qf2, 1, value2);
+     table.put(put1);
+
+     //Flush
+     table.flushCommits();
+     admin.flush(TN);
+
+     Scan scan = new Scan();
+     scan.addColumn(family, qf1);
+     scan.setMaxVersions(4);
+     ResultScanner scanner = table.getScanner(scan);
+
+     //Compare
+     Result result = scanner.next();
+     int size = 0;
+     while (result != null) {
+       size++;
+       List<Cell> cells = result.getColumnCells(family, qf1);
+       // Verify the cell size
+       Assert.assertEquals(1, cells.size());
+       // Verify the value
+       Assert.assertEquals(Bytes.toString(value1),
+           Bytes.toString(CellUtil.cloneValue(cells.get(0))));
+       result = scanner.next();
+     }
+     scanner.close();
+     Assert.assertEquals(1, size);
+     admin.close();
+   } catch (MasterNotRunningException e1) {
+     e1.printStackTrace();
+   } catch (ZooKeeperConnectionException e2) {
+     e2.printStackTrace();
+   } catch (IOException e3) {
+     e3.printStackTrace();
+   }
+ }
+
+ @Test
+ public void testFlushMobFile() throws InterruptedException {
+   String TN = "testFlushMobFile";
+   HTable table = null;
+   HBaseAdmin admin = null;
+
+   try {
+     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TN));
+     HColumnDescriptor hcd = new HColumnDescriptor(family);
+     hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+     hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+     hcd.setMaxVersions(4);
+     desc.addFamily(hcd);
+
+     admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+     admin.createTable(desc);
+     table = new HTable(TEST_UTIL.getConfiguration(), TN);
+
+     //put data
+     Put put0 = new Put(row1);
+     put0.add(family, qf1, 1, value1);
+     table.put(put0);
+
+     //put more data
+     Put put1 = new Put(row2);
+     put1.add(family, qf2, 1, value2);
+     table.put(put1);
+
+     //flush
+     table.flushCommits();
+     admin.flush(TN);
+
+     //Scan
+     Scan scan = new Scan();
+     scan.addColumn(family, qf1);
+     scan.setMaxVersions(4);
+     ResultScanner scanner = table.getScanner(scan);
+
+     //Compare
+     Result result = scanner.next();
+     int size = 0;
+     while (result != null) {
+       size++;
+       List<Cell> cells = result.getColumnCells(family, qf1);
+       // Verify the the cell size
+       Assert.assertEquals(1, cells.size());
+       // Verify the value
+       Assert.assertEquals(Bytes.toString(value1),
+           Bytes.toString(CellUtil.cloneValue(cells.get(0))));
+       result = scanner.next();
+     }
+     scanner.close();
+     Assert.assertEquals(1, size);
+     admin.close();
+   } catch (MasterNotRunningException e1) {
+     e1.printStackTrace();
+   } catch (ZooKeeperConnectionException e2) {
+     e2.printStackTrace();
+   } catch (IOException e3) {
+     e3.printStackTrace();
+   }
+ }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java
new file mode 100644
index 0000000..ae10aad
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java
@@ -0,0 +1,141 @@
+/**
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.mob;
+
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobDataBlockEncoding {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static byte [] row1 = Bytes.toBytes("row1");
+  private final static byte [] family = Bytes.toBytes("family");
+  private final static byte [] qf1 = Bytes.toBytes("qualifier1");
+  private final static byte [] qf2 = Bytes.toBytes("qualifier2");
+  protected final byte[] qf3 = Bytes.toBytes("qualifier3");
+  private static HTable table;
+  private static HBaseAdmin admin;
+  private static HColumnDescriptor hcd;
+  private static HTableDescriptor desc;
+  private static Random random = new Random();
+  private static long defaultThreshold = 10;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  public void setUp(long threshold, String TN, DataBlockEncoding encoding)
+      throws Exception {
+    desc = new HTableDescriptor(TableName.valueOf(TN));
+    hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(threshold));
+    hcd.setMaxVersions(4);
+    hcd.setDataBlockEncoding(encoding);
+    desc.addFamily(hcd);
+    admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+    admin.createTable(desc);
+    table = new HTable(TEST_UTIL.getConfiguration(), TN);
+  }
+
+  /**
+   * Generate the mob value.
+   *
+   * @param size the size of the value
+   * @return the mob value generated
+   */
+  private static byte[] generateMobValue(int size) {
+    byte[] mobVal = new byte[size];
+    random.nextBytes(mobVal);
+    return mobVal;
+  }
+
+  @Test
+  public void testDataBlockEncoding() throws Exception {
+    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+      testDataBlockEncoding(encoding);
+    }
+  }
+
+  public void testDataBlockEncoding(DataBlockEncoding encoding) throws Exception {
+    String TN = "testDataBlockEncoding" + encoding;
+    setUp(defaultThreshold, TN, encoding);
+    long ts1 = System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    byte[] value = generateMobValue((int) defaultThreshold + 1);
+
+    Put put1 = new Put(row1);
+    put1.add(family, qf1, ts3, value);
+    put1.add(family, qf2, ts2, value);
+    put1.add(family, qf3, ts1, value);
+    table.put(put1);
+
+    table.flushCommits();
+    admin.flush(TN);
+
+    Scan scan = new Scan();
+    scan.setMaxVersions(4);
+
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      List<Cell> cells = res.listCells();
+      for(Cell cell : cells) {
+        // Verify the value
+        Assert.assertEquals(Bytes.toString(value),
+            Bytes.toString(CellUtil.cloneValue(cell)));
+        count++;
+      }
+    }
+    results.close();
+    Assert.assertEquals(3, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
new file mode 100644
index 0000000..f6511f7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
@@ -0,0 +1,124 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestMobFile extends TestCase {
+  static final Log LOG = LogFactory.getLog(TestMobFile.class);
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private Configuration conf = TEST_UTIL.getConfiguration();
+  private CacheConfig cacheConf =  new CacheConfig(conf);
+  private final String TABLE = "tableName";
+  private final String FAMILY = "familyName";
+
+  @Test
+  public void testReadKeyValue() throws Exception {
+    FileSystem fs = FileSystem.get(conf);
+	Path testDir = FSUtils.getRootDir(conf);
+    Path outputDir = new Path(new Path(testDir, TABLE), FAMILY);
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
+    StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+            .withOutputDir(outputDir)
+            .withFileContext(meta)
+            .build();
+    String caseName = getName();
+    MobTestUtil.writeStoreFile(writer, caseName);
+
+    MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(),
+        conf, cacheConf, BloomType.NONE));
+    byte[] family = Bytes.toBytes(caseName);
+    byte[] qualify = Bytes.toBytes(caseName);
+
+    // Test the start key
+    byte[] startKey = Bytes.toBytes("aa");  // The start key bytes
+    KeyValue expectedKey =
+        new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
+    KeyValue seekKey = expectedKey.createKeyOnly(false);
+    KeyValue kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
+    MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+    // Test the end key
+    byte[] endKey = Bytes.toBytes("zz");  // The end key bytes
+    expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
+    seekKey = expectedKey.createKeyOnly(false);
+    kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
+    MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+    // Test the random key
+    byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
+    expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
+    seekKey = expectedKey.createKeyOnly(false);
+    kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
+    MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+    // Test the key which is less than the start key
+    byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
+    expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
+    seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
+    kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
+    MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+    // Test the key which is more than the end key
+    byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
+    seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
+    kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
+    assertNull(kv);
+  }
+
+  @Test
+  public void testGetScanner() throws Exception {
+    FileSystem fs = FileSystem.get(conf);
+    Path testDir = FSUtils.getRootDir(conf);
+    Path outputDir = new Path(new Path(testDir, TABLE), FAMILY);
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
+    StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+            .withOutputDir(outputDir)
+            .withFileContext(meta)
+            .build();
+    MobTestUtil.writeStoreFile(writer, getName());
+
+    MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(),
+        conf, cacheConf, BloomType.NONE));
+    assertNotNull(mobFile.getScanner());
+    assertTrue(mobFile.getScanner() instanceof StoreFileScanner);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java
new file mode 100644
index 0000000..9478544
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java
@@ -0,0 +1,206 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.Date;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.HMobStore;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestMobFileCache extends TestCase {
+  static final Log LOG = LogFactory.getLog(TestMobFileCache.class);
+  private HBaseTestingUtility UTIL;
+  private HRegion region;
+  private Configuration conf;
+  private MobCacheConfig mobCacheConf;
+  private MobFileCache mobFileCache;
+  private Date currentDate = new Date();
+  private final String TEST_CACHE_SIZE = "2";
+  private final int EXPECTED_CACHE_SIZE_ZERO = 0;
+  private final int EXPECTED_CACHE_SIZE_ONE = 1;
+  private final int EXPECTED_CACHE_SIZE_TWO = 2;
+  private final int EXPECTED_CACHE_SIZE_THREE = 3;
+  private final long EXPECTED_REFERENCE_ONE = 1;
+  private final long EXPECTED_REFERENCE_TWO = 2;
+
+  private final String TABLE = "tableName";
+  private final String FAMILY1 = "family1";
+  private final String FAMILY2 = "family2";
+  private final String FAMILY3 = "family3";
+
+  private final byte[] ROW = Bytes.toBytes("row");
+  private final byte[] ROW2 = Bytes.toBytes("row2");
+  private final byte[] VALUE = Bytes.toBytes("value");
+  private final byte[] VALUE2 = Bytes.toBytes("value2");
+  private final byte[] QF1 = Bytes.toBytes("qf1");
+  private final byte[] QF2 = Bytes.toBytes("qf2");
+  private final byte[] QF3 = Bytes.toBytes("qf3");
+
+  @Override
+  public void setUp() throws Exception {
+    UTIL = HBaseTestingUtility.createLocalHTU();
+    conf = UTIL.getConfiguration();
+    HTableDescriptor htd = UTIL.createTableDescriptor("testMobFileCache");
+    HColumnDescriptor hcd1 = new HColumnDescriptor(FAMILY1);
+    hcd1.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd1.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
+    HColumnDescriptor hcd2 = new HColumnDescriptor(FAMILY2);
+    hcd2.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd2.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
+    HColumnDescriptor hcd3 = new HColumnDescriptor(FAMILY3);
+    hcd3.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd3.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
+    htd.addFamily(hcd1);
+    htd.addFamily(hcd2);
+    htd.addFamily(hcd3);
+    region = UTIL.createLocalHRegion(htd, null, null);
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    region.close();
+    region.getFilesystem().delete(UTIL.getDataTestDir(), true);
+  }
+
+  /**
+   * Create the mob store file.
+   * @param family
+   */
+  private Path createMobStoreFile(String family) throws IOException {
+    return createMobStoreFile(HBaseConfiguration.create(), family);
+  }
+
+  /**
+   * Create the mob store file
+   * @param conf
+   * @param family
+   */
+  private Path createMobStoreFile(Configuration conf, String family) throws IOException {
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setMaxVersions(4);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    mobCacheConf = new MobCacheConfig(conf, hcd);
+    return createMobStoreFile(conf, hcd);
+  }
+
+  /**
+   * Create the mob store file
+   * @param conf
+   * @param hcd
+   */
+  private Path createMobStoreFile(Configuration conf, HColumnDescriptor hcd)
+      throws IOException {
+    // Setting up a Store
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
+    htd.addFamily(hcd);
+    HMobStore mobStore = (HMobStore) region.getStore(hcd.getName());
+    KeyValue key1 = new KeyValue(ROW, hcd.getName(), QF1, 1, VALUE);
+    KeyValue key2 = new KeyValue(ROW, hcd.getName(), QF2, 1, VALUE);
+    KeyValue key3 = new KeyValue(ROW2, hcd.getName(), QF3, 1, VALUE2);
+    KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
+    int maxKeyCount = keys.length;
+    HRegionInfo regionInfo = new HRegionInfo();
+    StoreFile.Writer mobWriter = mobStore.createWriterInTmp(currentDate,
+        maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey());
+    Path mobFilePath = mobWriter.getPath();
+    String fileName = mobFilePath.getName();
+    mobWriter.append(key1);
+    mobWriter.append(key2);
+    mobWriter.append(key3);
+    mobWriter.close();
+    String targetPathName = MobUtils.formatDate(currentDate);
+    Path targetPath = new Path(mobStore.getPath(), targetPathName);
+    mobStore.commitFile(mobFilePath, targetPath);
+    return new Path(targetPath, fileName);
+  }
+
+  @Test
+  public void testMobFileCache() throws Exception {
+    FileSystem fs = FileSystem.get(conf);
+    conf.set(MobConstants.MOB_FILE_CACHE_SIZE_KEY, TEST_CACHE_SIZE);
+    mobFileCache = new MobFileCache(conf);
+    Path file1Path = createMobStoreFile(FAMILY1);
+    Path file2Path = createMobStoreFile(FAMILY2);
+    Path file3Path = createMobStoreFile(FAMILY3);
+
+    // Before open one file by the MobFileCache
+    assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize());
+    // Open one file by the MobFileCache
+    CachedMobFile cachedMobFile1 = (CachedMobFile) mobFileCache.openFile(
+        fs, file1Path, mobCacheConf);
+    assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
+    assertNotNull(cachedMobFile1);
+    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
+
+    // The evict is also managed by a schedule thread pool.
+    // And its check period is set as 3600 seconds by default.
+    // This evict should get the lock at the most time
+    mobFileCache.evict();  // Cache not full, evict it
+    assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
+    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
+
+    mobFileCache.evictFile(file1Path.getName());  // Evict one file
+    assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize());
+    assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile1.getReferenceCount());
+
+    cachedMobFile1.close();  // Close the cached mob file
+
+    // Reopen three cached file
+    cachedMobFile1 = (CachedMobFile) mobFileCache.openFile(
+        fs, file1Path, mobCacheConf);
+    assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
+    CachedMobFile cachedMobFile2 = (CachedMobFile) mobFileCache.openFile(
+        fs, file2Path, mobCacheConf);
+    assertEquals(EXPECTED_CACHE_SIZE_TWO, mobFileCache.getCacheSize());
+    CachedMobFile cachedMobFile3 = (CachedMobFile) mobFileCache.openFile(
+        fs, file3Path, mobCacheConf);
+    // Before the evict
+    // Evict the cache, should clost the first file 1
+    assertEquals(EXPECTED_CACHE_SIZE_THREE, mobFileCache.getCacheSize());
+    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
+    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile2.getReferenceCount());
+    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile3.getReferenceCount());
+    mobFileCache.evict();
+    assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
+    assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile1.getReferenceCount());
+    assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile2.getReferenceCount());
+    assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile3.getReferenceCount());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java
new file mode 100644
index 0000000..9a6cf7f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.util.Date;
+import java.util.Random;
+import java.util.UUID;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.MD5Hash;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestMobFileName extends TestCase {
+
+  private String uuid;
+  private Date date;
+  private String dateStr;
+  private byte[] startKey;
+
+  public void setUp() {
+    Random random = new Random();
+    uuid = UUID.randomUUID().toString().replaceAll("-", "");
+    date = new Date();
+    dateStr = MobUtils.formatDate(date);
+    startKey = Bytes.toBytes(random.nextInt());
+  }
+
+  @Test
+  public void testHashCode() {
+    assertEquals(MobFileName.create(startKey, dateStr, uuid).hashCode(),
+        MobFileName.create(startKey, dateStr, uuid).hashCode());
+    assertNotSame(MobFileName.create(startKey, dateStr, uuid).hashCode(),
+        MobFileName.create(startKey, dateStr, uuid).hashCode());
+  }
+
+  @Test
+  public void testCreate() {
+    MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid);
+    assertEquals(mobFileName, MobFileName.create(mobFileName.getFileName()));
+  }
+
+  @Test
+  public void testGet() {
+    MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid);
+    assertEquals(MD5Hash.getMD5AsHex(startKey, 0, startKey.length), mobFileName.getStartKey());
+    assertEquals(dateStr, mobFileName.getDate());
+    assertEquals(mobFileName.getFileName(), MD5Hash.getMD5AsHex(startKey, 0, startKey.length)
+        + dateStr + uuid);
+  }
+
+  @Test
+  public void testEquals() {
+    MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid);
+    assertTrue(mobFileName.equals(mobFileName));
+    assertFalse(mobFileName.equals(this));
+    assertTrue(mobFileName.equals(MobFileName.create(startKey, dateStr, uuid)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
new file mode 100644
index 0000000..ed4b703
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestDeleteMobTable {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static byte[] FAMILY = Bytes.toBytes("family");
+  private final static byte[] QF = Bytes.toBytes("qualifier");
+  private static Random random = new Random();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Generate the mob value.
+   *
+   * @param size
+   *          the size of the value
+   * @return the mob value generated
+   */
+  private static byte[] generateMobValue(int size) {
+    byte[] mobVal = new byte[size];
+    random.nextBytes(mobVal);
+    return mobVal;
+  }
+
+  @Test
+  public void testDeleteMobTable() throws Exception {
+    byte[] tableName = Bytes.toBytes("testDeleteMobTable");
+    TableName tn = TableName.valueOf(tableName);
+    HTableDescriptor htd = new HTableDescriptor(tn);
+    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
+    htd.addFamily(hcd);
+    HBaseAdmin admin = null;
+    HTable table = null;
+    try {
+      admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+      admin.createTable(htd);
+      table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+      byte[] value = generateMobValue(10);
+
+      byte[] row = Bytes.toBytes("row");
+      Put put = new Put(row);
+      put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
+      table.put(put);
+
+      table.flushCommits();
+      admin.flush(tableName);
+
+      // the mob file exists
+      Assert.assertEquals(1, countMobFiles(tn, hcd.getNameAsString()));
+      Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
+      String fileName = assertHasOneMobRow(table, tn, hcd.getNameAsString());
+      Assert.assertFalse(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
+      Assert.assertTrue(mobTableDirExist(tn));
+      table.close();
+
+      admin.disableTable(tn);
+      admin.deleteTable(tn);
+
+      Assert.assertFalse(admin.tableExists(tn));
+      Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
+      Assert.assertEquals(1, countArchiveMobFiles(tn, hcd.getNameAsString()));
+      Assert.assertTrue(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
+      Assert.assertFalse(mobTableDirExist(tn));
+    } finally {
+      if (admin != null) {
+        admin.close();
+      }
+    }
+  }
+
+  @Test
+  public void testDeleteNonMobTable() throws Exception {
+    byte[] tableName = Bytes.toBytes("testDeleteNonMobTable");
+    TableName tn = TableName.valueOf(tableName);
+    HTableDescriptor htd = new HTableDescriptor(tn);
+    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+    htd.addFamily(hcd);
+    HBaseAdmin admin = null;
+    HTable table = null;
+    try {
+      admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+      admin.createTable(htd);
+      table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+      byte[] value = generateMobValue(10);
+
+      byte[] row = Bytes.toBytes("row");
+      Put put = new Put(row);
+      put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
+      table.put(put);
+
+      table.flushCommits();
+      admin.flush(tableName);
+      table.close();
+
+      // the mob file doesn't exist
+      Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
+      Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
+      Assert.assertFalse(mobTableDirExist(tn));
+
+      admin.disableTable(tn);
+      admin.deleteTable(tn);
+
+      Assert.assertFalse(admin.tableExists(tn));
+      Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
+      Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
+      Assert.assertFalse(mobTableDirExist(tn));
+    } finally {
+      if (admin != null) {
+        admin.close();
+      }
+    }
+  }
+
+  private int countMobFiles(TableName tn, String familyName) throws IOException {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path mobFileDir = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName);
+    if (fs.exists(mobFileDir)) {
+      return fs.listStatus(mobFileDir).length;
+    } else {
+      return 0;
+    }
+  }
+
+  private int countArchiveMobFiles(TableName tn, String familyName)
+      throws IOException {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
+        MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
+    if (fs.exists(storePath)) {
+      return fs.listStatus(storePath).length;
+    } else {
+      return 0;
+    }
+  }
+
+  private boolean mobTableDirExist(TableName tn) throws IOException {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path tableDir = FSUtils.getTableDir(MobUtils.getMobHome(TEST_UTIL.getConfiguration()), tn);
+    return fs.exists(tableDir);
+  }
+
+  private boolean mobArchiveExist(TableName tn, String familyName, String fileName)
+      throws IOException {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
+        MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
+    return fs.exists(new Path(storePath, fileName));
+  }
+
+  private String assertHasOneMobRow(HTable table, TableName tn, String familyName)
+      throws IOException {
+    Scan scan = new Scan();
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    ResultScanner rs = table.getScanner(scan);
+    Result r = rs.next();
+    Assert.assertNotNull(r);
+    byte[] value = r.getValue(FAMILY, QF);
+    String fileName = Bytes.toString(value, Bytes.SIZEOF_LONG, value.length - Bytes.SIZEOF_LONG);
+    Path filePath = new Path(
+        MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName), fileName);
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Assert.assertTrue(fs.exists(filePath));
+    r = rs.next();
+    Assert.assertNull(r);
+    return fileName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
new file mode 100644
index 0000000..c253611
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -0,0 +1,471 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+@Category(MediumTests.class)
+public class TestHMobStore {
+  public static final Log LOG = LogFactory.getLog(TestHMobStore.class);
+  @Rule public TestName name = new TestName();
+
+  private HMobStore store;
+  private HRegion region;
+  private HColumnDescriptor hcd;
+  private FileSystem fs;
+  private byte [] table = Bytes.toBytes("table");
+  private byte [] family = Bytes.toBytes("family");
+  private byte [] row = Bytes.toBytes("row");
+  private byte [] row2 = Bytes.toBytes("row2");
+  private byte [] qf1 = Bytes.toBytes("qf1");
+  private byte [] qf2 = Bytes.toBytes("qf2");
+  private byte [] qf3 = Bytes.toBytes("qf3");
+  private byte [] qf4 = Bytes.toBytes("qf4");
+  private byte [] qf5 = Bytes.toBytes("qf5");
+  private byte [] qf6 = Bytes.toBytes("qf6");
+  private byte[] value = Bytes.toBytes("value");
+  private byte[] value2 = Bytes.toBytes("value2");
+  private Path mobFilePath;
+  private Date currentDate = new Date();
+  private KeyValue seekKey1;
+  private KeyValue seekKey2;
+  private KeyValue seekKey3;
+  private NavigableSet<byte[]> qualifiers =
+    new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
+  private List<Cell> expected = new ArrayList<Cell>();
+  private long id = System.currentTimeMillis();
+  private Get get = new Get(row);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final String DIR = TEST_UTIL.getDataTestDir("TestHMobStore").toString();
+
+  /**
+   * Setup
+   * @throws Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    qualifiers.add(qf1);
+    qualifiers.add(qf3);
+    qualifiers.add(qf5);
+
+    Iterator<byte[]> iter = qualifiers.iterator();
+    while(iter.hasNext()){
+      byte [] next = iter.next();
+      expected.add(new KeyValue(row, family, next, 1, value));
+      get.addColumn(family, next);
+      get.setMaxVersions(); // all versions.
+    }
+  }
+
+  private void init(String methodName, Configuration conf, boolean testStore)
+  throws IOException {
+    hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+    hcd.setMaxVersions(4);
+    init(methodName, conf, hcd, testStore);
+  }
+
+  private void init(String methodName, Configuration conf,
+      HColumnDescriptor hcd, boolean testStore) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
+    init(methodName, conf, htd, hcd, testStore);
+  }
+
+  private void init(String methodName, Configuration conf, HTableDescriptor htd,
+      HColumnDescriptor hcd, boolean testStore) throws IOException {
+    //Setting up tje Region and Store
+    Path basedir = new Path(DIR+methodName);
+    Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
+    String logName = "logs";
+    Path logdir = new Path(basedir, logName);
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(logdir, true);
+
+    htd.addFamily(hcd);
+    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
+    HLog hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
+    region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
+    store = new HMobStore(region, hcd, conf);
+    if(testStore) {
+      init(conf, hcd);
+    }
+  }
+
+  private void init(Configuration conf, HColumnDescriptor hcd)
+      throws IOException {
+    Path basedir = FSUtils.getRootDir(conf);
+    fs = FileSystem.get(conf);
+    Path homePath = new Path(basedir, Bytes.toString(family) + Path.SEPARATOR
+        + Bytes.toString(family));
+    fs.mkdirs(homePath);
+
+    KeyValue key1 = new KeyValue(row, family, qf1, 1, value);
+    KeyValue key2 = new KeyValue(row, family, qf2, 1, value);
+    KeyValue key3 = new KeyValue(row2, family, qf3, 1, value2);
+    KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
+    int maxKeyCount = keys.length;
+    StoreFile.Writer mobWriter = store.createWriterInTmp(currentDate,
+        maxKeyCount, hcd.getCompactionCompression(), region.getStartKey());
+    mobFilePath = mobWriter.getPath();
+
+    mobWriter.append(key1);
+    mobWriter.append(key2);
+    mobWriter.append(key3);
+    mobWriter.close();
+
+    long valueLength1 = key1.getValueLength();
+    long valueLength2 = key2.getValueLength();
+    long valueLength3 = key3.getValueLength();
+
+    String targetPathName = MobUtils.formatDate(currentDate);
+    byte[] referenceValue =
+            Bytes.toBytes(targetPathName + Path.SEPARATOR
+                + mobFilePath.getName());
+    byte[] newReferenceValue1 = Bytes.add(Bytes.toBytes(valueLength1), referenceValue);
+    byte[] newReferenceValue2 = Bytes.add(Bytes.toBytes(valueLength2), referenceValue);
+    byte[] newReferenceValue3 = Bytes.add(Bytes.toBytes(valueLength3), referenceValue);
+    seekKey1 = new KeyValue(row, family, qf1, Long.MAX_VALUE, newReferenceValue1);
+    seekKey2 = new KeyValue(row, family, qf2, Long.MAX_VALUE, newReferenceValue2);
+    seekKey3 = new KeyValue(row2, family, qf3, Long.MAX_VALUE, newReferenceValue3);
+  }
+
+  /**
+   * Getting data from memstore
+   * @throws IOException
+   */
+  @Test
+  public void testGetFromMemStore() throws IOException {
+    final Configuration conf = HBaseConfiguration.create();
+    init(name.getMethodName(), conf, false);
+
+    //Put data in memstore
+    this.store.add(new KeyValue(row, family, qf1, 1, value));
+    this.store.add(new KeyValue(row, family, qf2, 1, value));
+    this.store.add(new KeyValue(row, family, qf3, 1, value));
+    this.store.add(new KeyValue(row, family, qf4, 1, value));
+    this.store.add(new KeyValue(row, family, qf5, 1, value));
+    this.store.add(new KeyValue(row, family, qf6, 1, value));
+
+    Scan scan = new Scan(get);
+    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+        scan.getFamilyMap().get(store.getFamily().getName()),
+        0);
+
+    List<Cell> results = new ArrayList<Cell>();
+    scanner.next(results);
+    Collections.sort(results, KeyValue.COMPARATOR);
+    scanner.close();
+
+    //Compare
+    Assert.assertEquals(expected.size(), results.size());
+    for(int i=0; i<results.size(); i++) {
+      // Verify the values
+      Assert.assertEquals(expected.get(i), results.get(i));
+    }
+  }
+
+  /**
+   * Getting MOB data from files
+   * @throws IOException
+   */
+  @Test
+  public void testGetFromFiles() throws IOException {
+    final Configuration conf = TEST_UTIL.getConfiguration();
+    init(name.getMethodName(), conf, false);
+
+    //Put data in memstore
+    this.store.add(new KeyValue(row, family, qf1, 1, value));
+    this.store.add(new KeyValue(row, family, qf2, 1, value));
+    //flush
+    flush(1);
+
+    //Add more data
+    this.store.add(new KeyValue(row, family, qf3, 1, value));
+    this.store.add(new KeyValue(row, family, qf4, 1, value));
+    //flush
+    flush(2);
+
+    //Add more data
+    this.store.add(new KeyValue(row, family, qf5, 1, value));
+    this.store.add(new KeyValue(row, family, qf6, 1, value));
+    //flush
+    flush(3);
+
+    Scan scan = new Scan(get);
+    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+        scan.getFamilyMap().get(store.getFamily().getName()),
+        0);
+
+    List<Cell> results = new ArrayList<Cell>();
+    scanner.next(results);
+    Collections.sort(results, KeyValue.COMPARATOR);
+    scanner.close();
+
+    //Compare
+    Assert.assertEquals(expected.size(), results.size());
+    for(int i=0; i<results.size(); i++) {
+      Assert.assertEquals(expected.get(i), results.get(i));
+    }
+  }
+
+  /**
+   * Getting the reference data from files
+   * @throws IOException
+   */
+  @Test
+  public void testGetReferencesFromFiles() throws IOException {
+    final Configuration conf = HBaseConfiguration.create();
+    init(name.getMethodName(), conf, false);
+
+    //Put data in memstore
+    this.store.add(new KeyValue(row, family, qf1, 1, value));
+    this.store.add(new KeyValue(row, family, qf2, 1, value));
+    //flush
+    flush(1);
+
+    //Add more data
+    this.store.add(new KeyValue(row, family, qf3, 1, value));
+    this.store.add(new KeyValue(row, family, qf4, 1, value));
+    //flush
+    flush(2);
+
+    //Add more data
+    this.store.add(new KeyValue(row, family, qf5, 1, value));
+    this.store.add(new KeyValue(row, family, qf6, 1, value));
+    //flush
+    flush(3);
+
+    Scan scan = new Scan(get);
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+      scan.getFamilyMap().get(store.getFamily().getName()),
+      0);
+
+    List<Cell> results = new ArrayList<Cell>();
+    scanner.next(results);
+    Collections.sort(results, KeyValue.COMPARATOR);
+    scanner.close();
+
+    //Compare
+    Assert.assertEquals(expected.size(), results.size());
+    for(int i=0; i<results.size(); i++) {
+      Cell cell = results.get(i);
+      Assert.assertTrue(MobUtils.isMobReferenceCell(cell));
+    }
+  }
+
+  /**
+   * Getting data from memstore and files
+   * @throws IOException
+   */
+  @Test
+  public void testGetFromMemStoreAndFiles() throws IOException {
+
+    final Configuration conf = HBaseConfiguration.create();
+
+    init(name.getMethodName(), conf, false);
+
+    //Put data in memstore
+    this.store.add(new KeyValue(row, family, qf1, 1, value));
+    this.store.add(new KeyValue(row, family, qf2, 1, value));
+    //flush
+    flush(1);
+
+    //Add more data
+    this.store.add(new KeyValue(row, family, qf3, 1, value));
+    this.store.add(new KeyValue(row, family, qf4, 1, value));
+    //flush
+    flush(2);
+
+    //Add more data
+    this.store.add(new KeyValue(row, family, qf5, 1, value));
+    this.store.add(new KeyValue(row, family, qf6, 1, value));
+
+    Scan scan = new Scan(get);
+    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+        scan.getFamilyMap().get(store.getFamily().getName()),
+        0);
+
+    List<Cell> results = new ArrayList<Cell>();
+    scanner.next(results);
+    Collections.sort(results, KeyValue.COMPARATOR);
+    scanner.close();
+
+    //Compare
+    Assert.assertEquals(expected.size(), results.size());
+    for(int i=0; i<results.size(); i++) {
+      Assert.assertEquals(expected.get(i), results.get(i));
+    }
+  }
+
+  /**
+   * Getting data from memstore and files
+   * @throws IOException
+   */
+  @Test
+  public void testMobCellSizeThreshold() throws IOException {
+
+    final Configuration conf = HBaseConfiguration.create();
+
+    HColumnDescriptor hcd;
+    hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(100L));
+    hcd.setMaxVersions(4);
+    init(name.getMethodName(), conf, hcd, false);
+
+    //Put data in memstore
+    this.store.add(new KeyValue(row, family, qf1, 1, value));
+    this.store.add(new KeyValue(row, family, qf2, 1, value));
+    //flush
+    flush(1);
+
+    //Add more data
+    this.store.add(new KeyValue(row, family, qf3, 1, value));
+    this.store.add(new KeyValue(row, family, qf4, 1, value));
+    //flush
+    flush(2);
+
+    //Add more data
+    this.store.add(new KeyValue(row, family, qf5, 1, value));
+    this.store.add(new KeyValue(row, family, qf6, 1, value));
+    //flush
+    flush(3);
+
+    Scan scan = new Scan(get);
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+      scan.getFamilyMap().get(store.getFamily().getName()),
+      0);
+
+    List<Cell> results = new ArrayList<Cell>();
+    scanner.next(results);
+    Collections.sort(results, KeyValue.COMPARATOR);
+    scanner.close();
+
+    //Compare
+    Assert.assertEquals(expected.size(), results.size());
+    for(int i=0; i<results.size(); i++) {
+      Cell cell = results.get(i);
+      //this is not mob reference cell.
+      Assert.assertFalse(MobUtils.isMobReferenceCell(cell));
+      Assert.assertEquals(expected.get(i), results.get(i));
+      Assert.assertEquals(100, MobUtils.getMobThreshold(store.getFamily()));
+    }
+  }
+
+  @Test
+  public void testCommitFile() throws Exception {
+    final Configuration conf = HBaseConfiguration.create();
+    init(name.getMethodName(), conf, true);
+    String targetPathName = MobUtils.formatDate(new Date());
+    Path targetPath = new Path(store.getPath(), (targetPathName
+        + Path.SEPARATOR + mobFilePath.getName()));
+    fs.delete(targetPath, true);
+    Assert.assertFalse(fs.exists(targetPath));
+    //commit file
+    store.commitFile(mobFilePath, targetPath);
+    Assert.assertTrue(fs.exists(targetPath));
+  }
+
+  @Test
+  public void testResolve() throws Exception {
+    final Configuration conf = HBaseConfiguration.create();
+    init(name.getMethodName(), conf, true);
+    String targetPathName = MobUtils.formatDate(currentDate);
+    Path targetPath = new Path(store.getPath(), targetPathName);
+    store.commitFile(mobFilePath, targetPath);
+    //resolve
+    Cell resultCell1 = store.resolve(seekKey1, false);
+    Cell resultCell2 = store.resolve(seekKey2, false);
+    Cell resultCell3 = store.resolve(seekKey3, false);
+    //compare
+    Assert.assertEquals(Bytes.toString(value),
+        Bytes.toString(CellUtil.cloneValue(resultCell1)));
+    Assert.assertEquals(Bytes.toString(value),
+        Bytes.toString(CellUtil.cloneValue(resultCell2)));
+    Assert.assertEquals(Bytes.toString(value2),
+        Bytes.toString(CellUtil.cloneValue(resultCell3)));
+  }
+
+  /**
+   * Flush the memstore
+   * @param storeFilesSize
+   * @throws IOException
+   */
+  private void flush(int storeFilesSize) throws IOException{
+    this.store.snapshot();
+    flushStore(store, id++);
+    Assert.assertEquals(storeFilesSize, this.store.getStorefiles().size());
+    Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).kvset.size());
+  }
+
+  /**
+   * Flush the memstore
+   * @param store
+   * @param id
+   * @throws IOException
+   */
+  private static void flushStore(HMobStore store, long id) throws IOException {
+    StoreFlushContext storeFlushCtx = store.createFlushContext(id);
+    storeFlushCtx.prepare();
+    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
new file mode 100644
index 0000000..2af97c3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
@@ -0,0 +1,318 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobStoreScanner {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static byte [] row1 = Bytes.toBytes("row1");
+  private final static byte [] family = Bytes.toBytes("family");
+  private final static byte [] qf1 = Bytes.toBytes("qualifier1");
+  private final static byte [] qf2 = Bytes.toBytes("qualifier2");
+  protected final byte[] qf3 = Bytes.toBytes("qualifier3");
+  private static HTable table;
+  private static HBaseAdmin admin;
+  private static HColumnDescriptor hcd;
+  private static HTableDescriptor desc;
+  private static Random random = new Random();
+  private static long defaultThreshold = 10;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  public void setUp(long threshold, String TN) throws Exception {
+    desc = new HTableDescriptor(TableName.valueOf(TN));
+    hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(threshold));
+    hcd.setMaxVersions(4);
+    desc.addFamily(hcd);
+    admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+    admin.createTable(desc);
+    table = new HTable(TEST_UTIL.getConfiguration(), TN);
+  }
+
+  /**
+   * Generate the mob value.
+   *
+   * @param size the size of the value
+   * @return the mob value generated
+   */
+  private static byte[] generateMobValue(int size) {
+    byte[] mobVal = new byte[size];
+    random.nextBytes(mobVal);
+    return mobVal;
+  }
+
+  /**
+   * Set the scan attribute
+   *
+   * @param reversed if true, scan will be backward order
+   * @param mobScanRaw if true, scan will get the mob reference
+   * @return this
+   */
+  public void setScan(Scan scan, boolean reversed, boolean mobScanRaw) {
+    scan.setReversed(reversed);
+    scan.setMaxVersions(4);
+    if(mobScanRaw) {
+      scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    }
+  }
+
+  @Test
+  public void testMobStoreScanner() throws Exception {
+	  testGetFromFiles(false);
+	  testGetFromMemStore(false);
+    testGetReferences(false);
+    testMobThreshold(false);
+  }
+
+  @Test
+  public void testReversedMobStoreScanner() throws Exception {
+	  testGetFromFiles(true);
+	  testGetFromMemStore(true);
+    testGetReferences(true);
+    testMobThreshold(true);
+  }
+
+  public void testGetFromFiles(boolean reversed) throws Exception {
+    String TN = "testGetFromFiles" + reversed;
+    setUp(defaultThreshold, TN);
+    long ts1 = System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    byte [] value = generateMobValue((int)defaultThreshold+1);
+
+    Put put1 = new Put(row1);
+    put1.add(family, qf1, ts3, value);
+    put1.add(family, qf2, ts2, value);
+    put1.add(family, qf3, ts1, value);
+    table.put(put1);
+
+    table.flushCommits();
+    admin.flush(TN);
+
+    Scan scan = new Scan();
+    setScan(scan, reversed, false);
+
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      List<Cell> cells = res.listCells();
+      for(Cell cell : cells) {
+        // Verify the value
+        Assert.assertEquals(Bytes.toString(value),
+            Bytes.toString(CellUtil.cloneValue(cell)));
+        count++;
+      }
+    }
+    results.close();
+    Assert.assertEquals(3, count);
+  }
+
+  public void testGetFromMemStore(boolean reversed) throws Exception {
+    String TN = "testGetFromMemStore" + reversed;
+    setUp(defaultThreshold, TN);
+    long ts1 = System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    byte [] value = generateMobValue((int)defaultThreshold+1);;
+
+    Put put1 = new Put(row1);
+    put1.add(family, qf1, ts3, value);
+    put1.add(family, qf2, ts2, value);
+    put1.add(family, qf3, ts1, value);
+    table.put(put1);
+
+    Scan scan = new Scan();
+    setScan(scan, reversed, false);
+
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      List<Cell> cells = res.listCells();
+      for(Cell cell : cells) {
+        // Verify the value
+        Assert.assertEquals(Bytes.toString(value),
+            Bytes.toString(CellUtil.cloneValue(cell)));
+        count++;
+      }
+    }
+    results.close();
+    Assert.assertEquals(3, count);
+  }
+
+  public void testGetReferences(boolean reversed) throws Exception {
+    String TN = "testGetReferences" + reversed;
+    setUp(defaultThreshold, TN);
+    long ts1 = System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    byte [] value = generateMobValue((int)defaultThreshold+1);;
+
+    Put put1 = new Put(row1);
+    put1.add(family, qf1, ts3, value);
+    put1.add(family, qf2, ts2, value);
+    put1.add(family, qf3, ts1, value);
+    table.put(put1);
+
+    table.flushCommits();
+    admin.flush(TN);
+
+    Scan scan = new Scan();
+    setScan(scan, reversed, true);
+
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      List<Cell> cells = res.listCells();
+      for(Cell cell : cells) {
+        // Verify the value
+        assertIsMobReference(cell, row1, family, value, TN);
+        count++;
+      }
+    }
+    results.close();
+    Assert.assertEquals(3, count);
+  }
+
+  public void testMobThreshold(boolean reversed) throws Exception {
+    String TN = "testMobThreshold" + reversed;
+    setUp(defaultThreshold, TN);
+    byte [] valueLess = generateMobValue((int)defaultThreshold-1);
+    byte [] valueEqual = generateMobValue((int)defaultThreshold);
+    byte [] valueGreater = generateMobValue((int)defaultThreshold+1);
+    long ts1 = System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+
+    Put put1 = new Put(row1);
+    put1.add(family, qf1, ts3, valueLess);
+    put1.add(family, qf2, ts2, valueEqual);
+    put1.add(family, qf3, ts1, valueGreater);
+    table.put(put1);
+
+    table.flushCommits();
+    admin.flush(TN);
+
+    Scan scan = new Scan();
+    setScan(scan, reversed, true);
+
+    Cell cellLess= null;
+    Cell cellEqual = null;
+    Cell cellGreater = null;
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      List<Cell> cells = res.listCells();
+      for(Cell cell : cells) {
+        // Verify the value
+        String qf = Bytes.toString(CellUtil.cloneQualifier(cell));
+        if(qf.equals(Bytes.toString(qf1))) {
+          cellLess = cell;
+        }
+        if(qf.equals(Bytes.toString(qf2))) {
+          cellEqual = cell;
+        }
+        if(qf.equals(Bytes.toString(qf3))) {
+          cellGreater = cell;
+        }
+        count++;
+      }
+    }
+    Assert.assertEquals(3, count);
+    assertNotMobReference(cellLess, row1, family, valueLess);
+    assertNotMobReference(cellEqual, row1, family, valueEqual);
+    assertIsMobReference(cellGreater, row1, family, valueGreater, TN);
+    results.close();
+  }
+
+  /**
+   * Assert the value is not store in mob.
+   */
+  private static void assertNotMobReference(Cell cell, byte[] row, byte[] family,
+      byte[] value) throws IOException {
+    Assert.assertEquals(Bytes.toString(row),
+        Bytes.toString(CellUtil.cloneRow(cell)));
+    Assert.assertEquals(Bytes.toString(family),
+        Bytes.toString(CellUtil.cloneFamily(cell)));
+    Assert.assertTrue(Bytes.toString(value).equals(
+        Bytes.toString(CellUtil.cloneValue(cell))));
+  }
+
+  /**
+   * Assert the value is store in mob.
+   */
+  private static void assertIsMobReference(Cell cell, byte[] row, byte[] family,
+      byte[] value, String TN) throws IOException {
+    Assert.assertEquals(Bytes.toString(row),
+        Bytes.toString(CellUtil.cloneRow(cell)));
+    Assert.assertEquals(Bytes.toString(family),
+        Bytes.toString(CellUtil.cloneFamily(cell)));
+    Assert.assertFalse(Bytes.toString(value).equals(
+        Bytes.toString(CellUtil.cloneValue(cell))));
+    byte[] referenceValue = CellUtil.cloneValue(cell);
+    String fileName = Bytes.toString(referenceValue, 8, referenceValue.length-8);
+    Path mobFamilyPath;
+    mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+        TableName.valueOf(TN)), hcd.getNameAsString());
+    Path targetPath = new Path(mobFamilyPath, fileName);
+    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+    Assert.assertTrue(fs.exists(targetPath));
+  }
+}


[2/2] git commit: HBASE-11643 Read and write MOB in HBase (Jingcheng Du)

Posted by jm...@apache.org.
HBASE-11643 Read and write MOB in HBase (Jingcheng Du)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5c14f749
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5c14f749
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5c14f749

Branch: refs/heads/hbase-11339
Commit: 5c14f749b3196fe5d9e1efe6dd5d6d7356cc72d0
Parents: bcfc6d6
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Thu Sep 4 05:41:21 2014 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Thu Sep 4 05:41:21 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/TagType.java   |   4 +
 .../src/main/resources/hbase-default.xml        |  29 ++
 .../master/handler/DeleteTableHandler.java      |  30 ++
 .../apache/hadoop/hbase/mob/CachedMobFile.java  | 114 +++++
 .../hbase/mob/DefaultMobStoreFlusher.java       | 217 +++++++++
 .../apache/hadoop/hbase/mob/MobCacheConfig.java |  56 +++
 .../apache/hadoop/hbase/mob/MobConstants.java   |  61 +++
 .../org/apache/hadoop/hbase/mob/MobFile.java    | 140 ++++++
 .../apache/hadoop/hbase/mob/MobFileCache.java   | 270 +++++++++++
 .../apache/hadoop/hbase/mob/MobFileName.java    | 169 +++++++
 .../apache/hadoop/hbase/mob/MobStoreEngine.java |  40 ++
 .../org/apache/hadoop/hbase/mob/MobUtils.java   | 263 +++++++++++
 .../hbase/regionserver/DefaultStoreEngine.java  |  16 +-
 .../hadoop/hbase/regionserver/HMobStore.java    | 268 +++++++++++
 .../hadoop/hbase/regionserver/HRegion.java      |   4 +
 .../hadoop/hbase/regionserver/HStore.java       |  48 +-
 .../hbase/regionserver/MobStoreScanner.java     |  69 +++
 .../regionserver/ReversedMobStoreScanner.java   |  69 +++
 .../hadoop/hbase/regionserver/StoreFile.java    |   2 +-
 .../apache/hadoop/hbase/mob/MobTestUtil.java    |  86 ++++
 .../hadoop/hbase/mob/TestCachedMobFile.java     | 154 ++++++
 .../hbase/mob/TestDefaultMobStoreFlusher.java   | 193 ++++++++
 .../hbase/mob/TestMobDataBlockEncoding.java     | 141 ++++++
 .../apache/hadoop/hbase/mob/TestMobFile.java    | 124 +++++
 .../hadoop/hbase/mob/TestMobFileCache.java      | 206 ++++++++
 .../hadoop/hbase/mob/TestMobFileName.java       |  79 ++++
 .../hbase/regionserver/TestDeleteMobTable.java  | 225 +++++++++
 .../hbase/regionserver/TestHMobStore.java       | 471 +++++++++++++++++++
 .../hbase/regionserver/TestMobStoreScanner.java | 318 +++++++++++++
 29 files changed, 3853 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index a21f7de..8613a35 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -28,4 +28,8 @@ public final class TagType {
   public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
   public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
   public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
+
+  // mob tags
+  public static final byte MOB_REFERENCE_TAG_TYPE = (byte) 5;
+  public static final byte MOB_TABLE_NAME_TAG_TYPE = (byte) 6;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index c724c27..872bbc5 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1454,4 +1454,33 @@ possible configurations would overwhelm and obscure the important.
     <name>hbase.http.staticuser.user</name>
     <value>dr.stack</value>
   </property>
+  <!-- Mob properties. -->
+  <property>
+    <name>hbase.mob.file.cache.size</name>
+    <value>1000</value>
+    <description>
+      Number of opened file handlers to cache.
+      A larger value will benefit reads by providing more file handlers per mob
+      file cache and would reduce frequent file opening and closing.
+      However, if this is set too high, this could lead to a "too many opened file handlers"
+      The default value is 1000.
+    </description>
+  </property>
+  <property>
+    <name>hbase.mob.cache.evict.period</name>
+    <value>3600</value>
+    <description>
+      The amount of time in seconds before the mob cache evicts cached mob files.
+      The default value is 3600 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.mob.cache.evict.remain.ratio</name>
+    <value>0.5f</value>
+    <description>
+      The ratio (between 0.0 and 1.0) of files that remains cached after an eviction
+      is triggered when the number of cached mob files exceeds the hbase.mob.file.cache.size.
+      The default value is 0.5f.
+    </description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
index 730da73..b5cdae9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -43,6 +45,8 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
 
 @InterfaceAudience.Private
 public class DeleteTableHandler extends TableEventHandler {
@@ -152,10 +156,36 @@ public class DeleteTableHandler extends TableEventHandler {
           tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName()));
     }
 
+    // Archive the mob data if there is a mob-enabled column
+    HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies();
+    boolean hasMob = false;
+    for (HColumnDescriptor hcd : hcds) {
+      if (MobUtils.isMobFamily(hcd)) {
+        hasMob = true;
+        break;
+      }
+    }
+    Path mobTableDir = null;
+    if (hasMob) {
+      // Archive mob data
+      mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME),
+          tableName);
+      Path regionDir =
+          new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName());
+      if (fs.exists(regionDir)) {
+        HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir);
+      }
+    }
     // 4. Delete table directory from FS (temp directory)
     if (!fs.delete(tempTableDir, true)) {
       LOG.error("Couldn't delete " + tempTableDir);
     }
+    // Delete the table directory where the mob files are saved
+    if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) {
+      if (!fs.delete(mobTableDir, true)) {
+        LOG.error("Couldn't delete " + mobTableDir);
+      }
+    }
 
     LOG.debug("Table '" + tableName + "' archived!");
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
new file mode 100644
index 0000000..457eb6c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+/**
+ * Cached mob file.
+ */
+@InterfaceAudience.Private
+public class CachedMobFile extends MobFile implements Comparable<CachedMobFile> {
+
+  private long accessCount;
+  private AtomicLong referenceCount = new AtomicLong(0);
+
+  public CachedMobFile(StoreFile sf) {
+    super(sf);
+  }
+
+  public static CachedMobFile create(FileSystem fs, Path path, Configuration conf,
+      CacheConfig cacheConf) throws IOException {
+    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+    return new CachedMobFile(sf);
+  }
+
+  public void access(long accessCount) {
+    this.accessCount = accessCount;
+  }
+
+  public int compareTo(CachedMobFile that) {
+    if (this.accessCount == that.accessCount) return 0;
+    return this.accessCount < that.accessCount ? 1 : -1;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof CachedMobFile)) {
+      return false;
+    }
+    return compareTo((CachedMobFile) obj) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)(accessCount ^ (accessCount >>> 32));
+  }
+
+  /**
+   * Opens the mob file if it's not opened yet and increases the reference.
+   * It's not thread-safe. Use MobFileCache.openFile() instead.
+   * The reader of the mob file is just opened when it's not opened no matter how many times
+   * this open() method is invoked.
+   * The reference is a counter that how many times this reader is referenced. When the
+   * reference is 0, this reader is closed.
+   */
+  @Override
+  public void open() throws IOException {
+    super.open();
+    referenceCount.incrementAndGet();
+  }
+
+  /**
+   * Decreases the reference of the underlying reader for the mob file.
+   * It's not thread-safe. Use MobFileCache.closeFile() instead.
+   * This underlying reader isn't closed until the reference is 0.
+   */
+  @Override
+  public void close() throws IOException {
+    long refs = referenceCount.decrementAndGet();
+    if (refs == 0) {
+      super.close();
+    }
+  }
+
+  /**
+   * Gets the reference of the current mob file.
+   * Internal usage, currently it's for testing.
+   * @return The reference of the current mob file.
+   */
+  public long getReferenceCount() {
+    return this.referenceCount.longValue();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
new file mode 100644
index 0000000..4a036a7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -0,0 +1,217 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
+import org.apache.hadoop.hbase.regionserver.HMobStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
+ * If the store is not a mob store, the flusher flushes the MemStore the same with
+ * DefaultStoreFlusher,
+ * If the store is a mob store, the flusher flushes the MemStore into two places.
+ * One is the store files of HBase, the other is the mob files.
+ * <ol>
+ * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li>
+ * <li>If the size of a cell value is larger than a threshold, it'll be flushed
+ * to a mob file, another cell with the path of this file will be flushed to HBase.</li>
+ * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
+ * HBase directly.</li>
+ * </ol>
+ *
+ */
+@InterfaceAudience.Private
+public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
+
+  private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class);
+  private final Object flushLock = new Object();
+  private long mobCellValueSizeThreshold = 0;
+  private Path targetPath;
+  private HMobStore mobStore;
+
+  public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException {
+    super(conf, store);
+    mobCellValueSizeThreshold = MobUtils.getMobThreshold(store.getFamily());
+    this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
+        store.getColumnFamilyName());
+    if (!this.store.getFileSystem().exists(targetPath)) {
+      this.store.getFileSystem().mkdirs(targetPath);
+    }
+    this.mobStore = (HMobStore) store;
+  }
+
+  /**
+   * Flushes the snapshot of the MemStore.
+   * If this store is not a mob store, flush the cells in the snapshot to store files of HBase.
+   * If the store is a mob one, the flusher flushes the MemStore into two places.
+   * One is the store files of HBase, the other is the mob files.
+   * <ol>
+   * <li>Cells that are not PUT type or have the delete mark will be directly flushed to
+   * HBase.</li>
+   * <li>If the size of a cell value is larger than a threshold, it'll be
+   * flushed to a mob file, another cell with the path of this file will be flushed to HBase.</li>
+   * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
+   * HBase directly.</li>
+   * </ol>
+   */
+  @Override
+  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
+      MonitoredTask status) throws IOException {
+    ArrayList<Path> result = new ArrayList<Path>();
+    int cellsCount = snapshot.getCellsCount();
+    if (cellsCount == 0) return result; // don't flush if there are no entries
+
+    // Use a store scanner to find which rows to flush.
+    long smallestReadPoint = store.getSmallestReadPoint();
+    InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
+    if (scanner == null) {
+      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
+    }
+    StoreFile.Writer writer;
+    try {
+      // TODO: We can fail in the below block before we complete adding this flush to
+      // list of store files. Add cleanup of anything put on filesystem if we fail.
+      synchronized (flushLock) {
+        status.setStatus("Flushing " + store + ": creating writer");
+        // Write the map out to the disk
+        writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
+            false, true, true);
+        writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
+        try {
+          // It's a mob store, flush the cells in a mob way. This is the difference of flushing
+          // between a normal and a mob store.
+          performMobFlush(snapshot, cacheFlushId, scanner, writer, status);
+        } finally {
+          finalizeWriter(writer, cacheFlushId, status);
+        }
+      }
+    } finally {
+      scanner.close();
+    }
+    LOG.info("Flushed, sequenceid=" + cacheFlushId + ", memsize="
+        + snapshot.getSize() + ", hasBloomFilter=" + writer.hasGeneralBloom()
+        + ", into tmp file " + writer.getPath());
+    result.add(writer.getPath());
+    return result;
+  }
+
+  /**
+   * Flushes the cells in the mob store.
+   * <ol>In the mob store, the cells with PUT type might have or have no mob tags.
+   * <li>If a cell does not have a mob tag, flushing the cell to different files depends
+   * on the value length. If the length is larger than a threshold, it's flushed to a
+   * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly
+   * flush the cell to a store file in HBase.</li>
+   * <li>If a cell have a mob tag, its value is a mob file name, directly flush it
+   * to a store file in HBase.</li>
+   * </ol>
+   * @param snapshot Memstore snapshot.
+   * @param cacheFlushId Log cache flush sequence number.
+   * @param scanner The scanner of memstore snapshot.
+   * @param writer The store file writer.
+   * @param status Task that represents the flush operation and may be updated with status.
+   * @throws IOException
+   */
+  protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
+      InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException {
+    StoreFile.Writer mobFileWriter = null;
+    int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
+        HConstants.COMPACTION_KV_MAX_DEFAULT);
+    long mobKVCount = 0;
+    long time = snapshot.getTimeRangeTracker().getMaximumTimestamp();
+    mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
+        store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
+    // the target path is {tableName}/.mob/{cfName}/mobFiles
+    // the relative path is mobFiles
+    byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
+    try {
+      Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
+          .getName());
+      List<Cell> cells = new ArrayList<Cell>();
+      boolean hasMore;
+      do {
+        hasMore = scanner.next(cells, compactionKVMax);
+        if (!cells.isEmpty()) {
+          for (Cell c : cells) {
+            // If we know that this KV is going to be included always, then let us
+            // set its memstoreTS to 0. This will help us save space when writing to
+            // disk.
+            KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+            if (kv.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(kv)
+                || kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
+              writer.append(kv);
+            } else {
+              // append the original keyValue in the mob file.
+              mobFileWriter.append(kv);
+              mobKVCount++;
+
+              // append the tags to the KeyValue.
+              // The key is same, the value is the filename of the mob file
+              KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
+              writer.append(reference);
+            }
+          }
+          cells.clear();
+        }
+      } while (hasMore);
+    } finally {
+      status.setStatus("Flushing mob file " + store + ": appending metadata");
+      mobFileWriter.appendMetadata(cacheFlushId, false);
+      status.setStatus("Flushing mob file " + store + ": closing flushed file");
+      mobFileWriter.close();
+    }
+
+    if (mobKVCount > 0) {
+      // commit the mob file from temp folder to target folder.
+      // If the mob file is committed successfully but the store file is not,
+      // the committed mob file will be handled by the sweep tool as an unused
+      // file.
+      mobStore.commitFile(mobFileWriter.getPath(), targetPath);
+    } else {
+      try {
+        // If the mob file is empty, delete it instead of committing.
+        store.getFileSystem().delete(mobFileWriter.getPath(), true);
+      } catch (IOException e) {
+        LOG.error("Fail to delete the temp mob file", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java
new file mode 100644
index 0000000..b160010
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+
+/**
+ * The cache configuration for the mob.
+ */
+@InterfaceAudience.Private
+public class MobCacheConfig extends CacheConfig {
+
+  private static MobFileCache mobFileCache;
+
+  public MobCacheConfig(Configuration conf, HColumnDescriptor family) {
+    super(conf, family);
+    instantiateMobFileCache(conf);
+  }
+
+  /**
+   * Instantiates the MobFileCache.
+   * @param conf The current configuration.
+   */
+  public static synchronized void instantiateMobFileCache(Configuration conf) {
+    if (mobFileCache == null) {
+      mobFileCache = new MobFileCache(conf);
+    }
+  }
+
+  /**
+   * Gets the MobFileCache.
+   * @return The MobFileCache.
+   */
+  public MobFileCache getMobFileCache() {
+    return mobFileCache;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
new file mode 100644
index 0000000..d208722
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The constants used in mob.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MobConstants {
+
+  public static final byte[] IS_MOB = Bytes.toBytes("isMob");
+  public static final byte[] MOB_THRESHOLD = Bytes.toBytes("mobThreshold");
+  public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k
+
+  public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw";
+  public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks";
+
+  public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size";
+  public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000;
+
+  public static final String MOB_DIR_NAME = "mobdir";
+  public static final String MOB_REGION_NAME = ".mob";
+  public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME);
+
+  public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period";
+  public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio";
+  public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE,
+      HConstants.EMPTY_BYTE_ARRAY);
+
+  public static final float DEFAULT_EVICT_REMAIN_RATIO = 0.5f;
+  public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
+
+  public final static String TEMP_DIR_NAME = ".tmp";
+  private MobConstants() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
new file mode 100644
index 0000000..a120057
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
@@ -0,0 +1,140 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+
+/**
+ * The mob file.
+ */
+@InterfaceAudience.Private
+public class MobFile {
+
+  private StoreFile sf;
+
+  // internal use only for sub classes
+  protected MobFile() {
+  }
+
+  protected MobFile(StoreFile sf) {
+    this.sf = sf;
+  }
+
+  /**
+   * Internal use only. This is used by the sweeper.
+   *
+   * @return The store file scanner.
+   * @throws IOException
+   */
+  public StoreFileScanner getScanner() throws IOException {
+    List<StoreFile> sfs = new ArrayList<StoreFile>();
+    sfs.add(sf);
+    List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
+        false, null, sf.getMaxMemstoreTS());
+
+    return sfScanners.get(0);
+  }
+
+  /**
+   * Reads a cell from the mob file.
+   * @param search The cell need to be searched in the mob file.
+   * @param cacheMobBlocks Should this scanner cache blocks.
+   * @return The cell in the mob file.
+   * @throws IOException
+   */
+  public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
+    Cell result = null;
+    StoreFileScanner scanner = null;
+    List<StoreFile> sfs = new ArrayList<StoreFile>();
+    sfs.add(sf);
+    try {
+      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs,
+          cacheMobBlocks, true, false, null, sf.getMaxMemstoreTS());
+      if (!sfScanners.isEmpty()) {
+        scanner = sfScanners.get(0);
+        if (scanner.seek(search)) {
+          result = scanner.peek();
+        }
+      }
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Gets the file name.
+   * @return The file name.
+   */
+  public String getFileName() {
+    return sf.getPath().getName();
+  }
+
+  /**
+   * Opens the underlying reader.
+   * It's not thread-safe. Use MobFileCache.openFile() instead.
+   * @throws IOException
+   */
+  public void open() throws IOException {
+    if (sf.getReader() == null) {
+      sf.createReader();
+    }
+  }
+
+  /**
+   * Closes the underlying reader, but do no evict blocks belonging to this file.
+   * It's not thread-safe. Use MobFileCache.closeFile() instead.
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    if (sf != null) {
+      sf.closeReader(false);
+      sf = null;
+    }
+  }
+
+  /**
+   * Creates an instance of the MobFile.
+   * @param fs The file system.
+   * @param path The path of the underlying StoreFile.
+   * @param conf The configuration.
+   * @param cacheConf The CacheConfig.
+   * @return An instance of the MobFile.
+   * @throws IOException
+   */
+  public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf)
+      throws IOException {
+    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+    return new MobFile(sf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
new file mode 100644
index 0000000..97530b1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.IdLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The cache for mob files.
+ * This cache doesn't cache the mob file blocks. It only caches the references of mob files.
+ * We are doing this to avoid opening and closing mob files all the time. We just keep
+ * references open.
+ */
+@InterfaceAudience.Private
+public class MobFileCache {
+
+  private static final Log LOG = LogFactory.getLog(MobFileCache.class);
+
+  /*
+   * Eviction and statistics thread. Periodically run to print the statistics and
+   * evict the lru cached mob files when the count of the cached files is larger
+   * than the threshold.
+   */
+  static class EvictionThread extends Thread {
+    MobFileCache lru;
+
+    public EvictionThread(MobFileCache lru) {
+      super("MobFileCache.EvictionThread");
+      setDaemon(true);
+      this.lru = lru;
+    }
+
+    @Override
+    public void run() {
+      lru.evict();
+    }
+  }
+
+  // a ConcurrentHashMap, accesses to this map are synchronized.
+  private Map<String, CachedMobFile> map = null;
+  // caches access count
+  private final AtomicLong count;
+  private long lastAccess;
+  private final AtomicLong miss;
+  private long lastMiss;
+
+  // a lock to sync the evict to guarantee the eviction occurs in sequence.
+  // the method evictFile is not sync by this lock, the ConcurrentHashMap does the sync there.
+  private final ReentrantLock evictionLock = new ReentrantLock(true);
+
+  //stripes lock on each mob file based on its hash. Sync the openFile/closeFile operations.
+  private final IdLock keyLock = new IdLock();
+
+  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
+      new ThreadFactoryBuilder().setNameFormat("MobFileCache #%d").setDaemon(true).build());
+  private final Configuration conf;
+
+  // the count of the cached references to mob files
+  private final int mobFileMaxCacheSize;
+  private final boolean isCacheEnabled;
+  private float evictRemainRatio;
+
+  public MobFileCache(Configuration conf) {
+    this.conf = conf;
+    this.mobFileMaxCacheSize = conf.getInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY,
+        MobConstants.DEFAULT_MOB_FILE_CACHE_SIZE);
+    isCacheEnabled = (mobFileMaxCacheSize > 0);
+    map = new ConcurrentHashMap<String, CachedMobFile>(mobFileMaxCacheSize);
+    this.count = new AtomicLong(0);
+    this.miss = new AtomicLong(0);
+    this.lastAccess = 0;
+    this.lastMiss = 0;
+    if (isCacheEnabled) {
+      long period = conf.getLong(MobConstants.MOB_CACHE_EVICT_PERIOD,
+          MobConstants.DEFAULT_MOB_CACHE_EVICT_PERIOD); // in seconds
+      evictRemainRatio = conf.getFloat(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO,
+          MobConstants.DEFAULT_EVICT_REMAIN_RATIO);
+      if (evictRemainRatio < 0.0) {
+        evictRemainRatio = 0.0f;
+        LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is less than 0.0, 0.0 is used.");
+      } else if (evictRemainRatio > 1.0) {
+        evictRemainRatio = 1.0f;
+        LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is larger than 1.0, 1.0 is used.");
+      }
+      this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period,
+          TimeUnit.SECONDS);
+    }
+    LOG.info("MobFileCache is initialized, and the cache size is " + mobFileMaxCacheSize);
+  }
+
+  /**
+   * Evicts the lru cached mob files when the count of the cached files is larger
+   * than the threshold.
+   */
+  public void evict() {
+    if (isCacheEnabled) {
+      // Ensure only one eviction at a time
+      if (!evictionLock.tryLock()) {
+        return;
+      }
+      printStatistics();
+      List<CachedMobFile> evictedFiles = new ArrayList<CachedMobFile>();
+      try {
+        if (map.size() <= mobFileMaxCacheSize) {
+          return;
+        }
+        List<CachedMobFile> files = new ArrayList<CachedMobFile>(map.values());
+        Collections.sort(files);
+        int start = (int) (mobFileMaxCacheSize * evictRemainRatio);
+        if (start >= 0) {
+          for (int i = start; i < files.size(); i++) {
+            String name = files.get(i).getFileName();
+            CachedMobFile evictedFile = map.remove(name);
+            if (evictedFile != null) {
+              evictedFiles.add(evictedFile);
+            }
+          }
+        }
+      } finally {
+        evictionLock.unlock();
+      }
+      // EvictionLock is released. Close the evicted files one by one.
+      // The closes are sync in the closeFile method.
+      for (CachedMobFile evictedFile : evictedFiles) {
+        closeFile(evictedFile);
+      }
+    }
+  }
+
+  /**
+   * Evicts the cached file by the name.
+   * @param fileName The name of a cached file.
+   */
+  public void evictFile(String fileName) {
+    if (isCacheEnabled) {
+      IdLock.Entry lockEntry = null;
+      try {
+        // obtains the lock to close the cached file.
+        lockEntry = keyLock.getLockEntry(fileName.hashCode());
+        CachedMobFile evictedFile = map.remove(fileName);
+        if (evictedFile != null) {
+          evictedFile.close();
+        }
+      } catch (IOException e) {
+        LOG.error("Fail to evict the file " + fileName, e);
+      } finally {
+        if (lockEntry != null) {
+          keyLock.releaseLockEntry(lockEntry);
+        }
+      }
+    }
+  }
+
+  /**
+   * Opens a mob file.
+   * @param fs The current file system.
+   * @param path The file path.
+   * @param cacheConf The current MobCacheConfig
+   * @return A opened mob file.
+   * @throws IOException
+   */
+  public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException {
+    if (!isCacheEnabled) {
+      return MobFile.create(fs, path, conf, cacheConf);
+    } else {
+      String fileName = path.getName();
+      CachedMobFile cached = map.get(fileName);
+      IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode());
+      try {
+        if (cached == null) {
+          cached = map.get(fileName);
+          if (cached == null) {
+            if (map.size() > mobFileMaxCacheSize) {
+              evict();
+            }
+            cached = CachedMobFile.create(fs, path, conf, cacheConf);
+            cached.open();
+            map.put(fileName, cached);
+            miss.incrementAndGet();
+          }
+        }
+        cached.open();
+        cached.access(count.incrementAndGet());
+      } finally {
+        keyLock.releaseLockEntry(lockEntry);
+      }
+      return cached;
+    }
+  }
+
+  /**
+   * Closes a mob file.
+   * @param file The mob file that needs to be closed.
+   */
+  public void closeFile(MobFile file) {
+    IdLock.Entry lockEntry = null;
+    try {
+      if (!isCacheEnabled) {
+        file.close();
+      } else {
+        lockEntry = keyLock.getLockEntry(file.getFileName().hashCode());
+        file.close();
+      }
+    } catch (IOException e) {
+      LOG.error("MobFileCache, Exception happen during close " + file.getFileName(), e);
+    } finally {
+      if (lockEntry != null) {
+        keyLock.releaseLockEntry(lockEntry);
+      }
+    }
+  }
+
+  /**
+   * Gets the count of cached mob files.
+   * @return The count of the cached mob files.
+   */
+  public int getCacheSize() {
+    return map == null ? 0 : map.size();
+  }
+
+  /**
+   * Prints the statistics.
+   */
+  public void printStatistics() {
+    long access = count.get() - lastAccess;
+    long missed = miss.get() - lastMiss;
+    long hitRate = (access - missed) * 100 / access;
+    LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: "
+        + (access - missed) + ", hit rate: "
+        + ((access == 0) ? 0 : hitRate) + "%");
+    lastAccess += access;
+    lastMiss += missed;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
new file mode 100644
index 0000000..937e965
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
@@ -0,0 +1,169 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.MD5Hash;
+
+/**
+ * The mob file name.
+ * It consists of a md5 of a start key, a date and an uuid.
+ * It looks like md5(start) + date + uuid.
+ * <ol>
+ * <li>0-31 characters: md5 hex string of a start key. Since the length of the start key is not
+ * fixed, have to use the md5 instead which has a fix length.</li>
+ * <li>32-39 characters: a string of a date with format yyyymmdd. The date is the latest timestamp
+ * of cells in this file</li>
+ * <li>the remaining characters: the uuid.</li>
+ * </ol>
+ * Using md5 hex string of start key as the prefix of file name makes files with the same start
+ * key unique, they're different from the ones with other start keys
+ * The cells come from different regions might be in the same mob file by region split,
+ * this is allowed.
+ * Has the latest timestamp of cells in the file name in order to clean the expired mob files by
+ * TTL easily. If this timestamp is older than the TTL, it's regarded as expired.
+ */
+@InterfaceAudience.Private
+public class MobFileName {
+
+  private final String date;
+  private final String startKey;
+  private final String uuid;
+  private final String fileName;
+
+  /**
+   * @param startKey
+   *          The start key.
+   * @param date
+   *          The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+   * @param uuid
+   *          The uuid
+   */
+  private MobFileName(byte[] startKey, String date, String uuid) {
+    this.startKey = MD5Hash.getMD5AsHex(startKey, 0, startKey.length);
+    this.uuid = uuid;
+    this.date = date;
+    this.fileName = this.startKey + date + uuid;
+  }
+
+  /**
+   * @param startKey
+   *          The md5 hex string of the start key.
+   * @param date
+   *          The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+   * @param uuid
+   *          The uuid
+   */
+  private MobFileName(String startKey, String date, String uuid) {
+    this.startKey = startKey;
+    this.uuid = uuid;
+    this.date = date;
+    this.fileName = this.startKey + date + uuid;
+  }
+
+  /**
+   * Creates an instance of MobFileName
+   *
+   * @param startKey
+   *          The start key.
+   * @param date
+   *          The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+   * @param uuid The uuid.
+   * @return An instance of a MobFileName.
+   */
+  public static MobFileName create(byte[] startKey, String date, String uuid) {
+    return new MobFileName(startKey, date, uuid);
+  }
+
+  /**
+   * Creates an instance of MobFileName
+   *
+   * @param startKey
+   *          The md5 hex string of the start key.
+   * @param date
+   *          The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+   * @param uuid The uuid.
+   * @return An instance of a MobFileName.
+   */
+  public static MobFileName create(String startKey, String date, String uuid) {
+    return new MobFileName(startKey, date, uuid);
+  }
+
+  /**
+   * Creates an instance of MobFileName.
+   * @param fileName The string format of a file name.
+   * @return An instance of a MobFileName.
+   */
+  public static MobFileName create(String fileName) {
+    // The format of a file name is md5HexString(0-31bytes) + date(32-39bytes) + UUID
+    // The date format is yyyyMMdd
+    String startKey = fileName.substring(0, 32);
+    String date = fileName.substring(32, 40);
+    String uuid = fileName.substring(40);
+    return new MobFileName(startKey, date, uuid);
+  }
+
+  /**
+   * Gets the hex string of the md5 for a start key.
+   * @return The hex string of the md5 for a start key.
+   */
+  public String getStartKey() {
+    return startKey;
+  }
+
+  /**
+   * Gets the date string. Its format is yyyymmdd.
+   * @return The date string.
+   */
+  public String getDate() {
+    return this.date;
+  }
+
+  @Override
+  public int hashCode() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(startKey);
+    builder.append(date);
+    builder.append(uuid);
+    return builder.toString().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object anObject) {
+    if (this == anObject) {
+      return true;
+    }
+    if (anObject instanceof MobFileName) {
+      MobFileName another = (MobFileName) anObject;
+      if (this.startKey.equals(another.startKey) && this.date.equals(another.date)
+          && this.uuid.equals(another.uuid)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Gets the file name.
+   * @return The file name.
+   */
+  public String getFileName() {
+    return this.fileName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
new file mode 100644
index 0000000..d5e6f2e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+/**
+ * MobStoreEngine creates the mob specific compactor, and store flusher.
+ */
+@InterfaceAudience.Private
+public class MobStoreEngine extends DefaultStoreEngine {
+
+  @Override
+  protected void createStoreFlusher(Configuration conf, Store store) throws IOException {
+    // When using MOB, we use DefaultMobStoreFlusher always
+    // Just use the compactor and compaction policy as that in DefaultStoreEngine. We can have MOB
+    // specific compactor and policy when that is implemented.
+    storeFlusher = new DefaultMobStoreFlusher(conf, store);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
new file mode 100644
index 0000000..149eed4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -0,0 +1,263 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * The mob utilities
+ */
+@InterfaceAudience.Private
+public class MobUtils {
+
+  private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
+      new ThreadLocal<SimpleDateFormat>() {
+    @Override
+    protected SimpleDateFormat initialValue() {
+      return new SimpleDateFormat("yyyyMMdd");
+    }
+  };
+
+  /**
+   * Indicates whether the column family is a mob one.
+   * @param hcd The descriptor of a column family.
+   * @return True if this column family is a mob one, false if it's not.
+   */
+  public static boolean isMobFamily(HColumnDescriptor hcd) {
+    byte[] isMob = hcd.getValue(MobConstants.IS_MOB);
+    return isMob != null && isMob.length == 1 && Bytes.toBoolean(isMob);
+  }
+
+  /**
+   * Gets the mob threshold.
+   * If the size of a cell value is larger than this threshold, it's regarded as a mob.
+   * The default threshold is 1024*100(100K)B.
+   * @param hcd The descriptor of a column family.
+   * @return The threshold.
+   */
+  public static long getMobThreshold(HColumnDescriptor hcd) {
+    byte[] threshold = hcd.getValue(MobConstants.MOB_THRESHOLD);
+    return threshold != null && threshold.length == Bytes.SIZEOF_LONG ? Bytes.toLong(threshold)
+        : MobConstants.DEFAULT_MOB_THRESHOLD;
+  }
+
+  /**
+   * Formats a date to a string.
+   * @param date The date.
+   * @return The string format of the date, it's yyyymmdd.
+   */
+  public static String formatDate(Date date) {
+    return LOCAL_FORMAT.get().format(date);
+  }
+
+  /**
+   * Parses the string to a date.
+   * @param dateString The string format of a date, it's yyyymmdd.
+   * @return A date.
+   * @throws ParseException
+   */
+  public static Date parseDate(String dateString) throws ParseException {
+    return LOCAL_FORMAT.get().parse(dateString);
+  }
+
+  /**
+   * Whether the current cell is a mob reference cell.
+   * @param cell The current cell.
+   * @return True if the cell has a mob reference tag, false if it doesn't.
+   */
+  public static boolean isMobReferenceCell(Cell cell) {
+    if (cell.getTagsLength() > 0) {
+      Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
+          TagType.MOB_REFERENCE_TAG_TYPE);
+      return tag != null;
+    }
+    return false;
+  }
+
+  /**
+   * Whether the tag list has a mob reference tag.
+   * @param tags The tag list.
+   * @return True if the list has a mob reference tag, false if it doesn't.
+   */
+  public static boolean hasMobReferenceTag(List<Tag> tags) {
+    if (!tags.isEmpty()) {
+      for (Tag tag : tags) {
+        if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Indicates whether it's a raw scan.
+   * The information is set in the attribute "hbase.mob.scan.raw" of scan.
+   * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
+   * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
+   * the mob file.
+   * @param scan The current scan.
+   * @return True if it's a raw scan.
+   */
+  public static boolean isRawMobScan(Scan scan) {
+    byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
+    try {
+      return raw != null && Bytes.toBoolean(raw);
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Indicates whether the scan contains the information of caching blocks.
+   * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
+   * @param scan The current scan.
+   * @return True when the Scan attribute specifies to cache the MOB blocks.
+   */
+  public static boolean isCacheMobBlocks(Scan scan) {
+    byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
+    try {
+      return cache != null && Bytes.toBoolean(cache);
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Sets the attribute of caching blocks in the scan.
+   *
+   * @param scan
+   *          The current scan.
+   * @param cacheBlocks
+   *          True, set the attribute of caching blocks into the scan, the scanner with this scan
+   *          caches blocks.
+   *          False, the scanner doesn't cache blocks for this scan.
+   */
+  public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
+    scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
+  }
+
+  /**
+   * Gets the root dir of the mob files.
+   * It's {HBASE_DIR}/mobdir.
+   * @param conf The current configuration.
+   * @return the root dir of the mob file.
+   */
+  public static Path getMobHome(Configuration conf) {
+    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
+    return new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
+  }
+
+  /**
+   * Gets the region dir of the mob files.
+   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
+   * @param conf The current configuration.
+   * @param tableName The current table name.
+   * @return The region dir of the mob files.
+   */
+  public static Path getMobRegionPath(Configuration conf, TableName tableName) {
+    Path tablePath = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
+    HRegionInfo regionInfo = getMobRegionInfo(tableName);
+    return new Path(tablePath, regionInfo.getEncodedName());
+  }
+
+  /**
+   * Gets the family dir of the mob files.
+   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
+   * @param conf The current configuration.
+   * @param tableName The current table name.
+   * @param familyName The current family name.
+   * @return The family dir of the mob files.
+   */
+  public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
+    return new Path(getMobRegionPath(conf, tableName), familyName);
+  }
+
+  /**
+   * Gets the family dir of the mob files.
+   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
+   * @param regionPath The path of mob region which is a dummy one.
+   * @param familyName The current family name.
+   * @return The family dir of the mob files.
+   */
+  public static Path getMobFamilyPath(Path regionPath, String familyName) {
+    return new Path(regionPath, familyName);
+  }
+
+  /**
+   * Gets the HRegionInfo of the mob files.
+   * This is a dummy region. The mob files are not saved in a region in HBase.
+   * This is only used in mob snapshot. It's internally used only.
+   * @param tableName
+   * @return A dummy mob region info.
+   */
+  public static HRegionInfo getMobRegionInfo(TableName tableName) {
+    HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES,
+        HConstants.EMPTY_END_ROW, false, 0);
+    return info;
+  }
+
+  /**
+   * Creates a mob reference KeyValue.
+   * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
+   * @param kv The original KeyValue.
+   * @param fileName The mob file name where the mob reference KeyValue is written.
+   * @param tableNameTag The tag of the current table name. It's very important in
+   *                        cloning the snapshot.
+   * @return The mob reference KeyValue.
+   */
+  public static KeyValue createMobRefKeyValue(KeyValue kv, byte[] fileName, Tag tableNameTag) {
+    // Append the tags to the KeyValue.
+    // The key is same, the value is the filename of the mob file
+    List<Tag> existingTags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
+    existingTags.add(MobConstants.MOB_REF_TAG);
+    // Add the tag of the source table name, this table is where this mob file is flushed
+    // from.
+    // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
+    // find the original mob files by this table name. For details please see cloning
+    // snapshot for mob files.
+    existingTags.add(tableNameTag);
+    long valueLength = kv.getValueLength();
+    byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
+    KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+        kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
+        kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put,
+        refValue, 0, refValue.length, existingTags);
+    reference.setSequenceId(kv.getSequenceId());
+    return reference;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index f2a0b06..4afa80c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@ -63,6 +63,12 @@ public class DefaultStoreEngine extends StoreEngine<
   protected void createComponents(
       Configuration conf, Store store, KVComparator kvComparator) throws IOException {
     storeFileManager = new DefaultStoreFileManager(kvComparator, conf);
+    createCompactor(conf, store);
+    createCompactionPolicy(conf, store);
+    createStoreFlusher(conf, store);
+  }
+
+  protected void createCompactor(Configuration conf, Store store) throws IOException {
     String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName());
     try {
       compactor = ReflectionUtils.instantiateWithCustomCtor(className,
@@ -70,7 +76,10 @@ public class DefaultStoreEngine extends StoreEngine<
     } catch (Exception e) {
       throw new IOException("Unable to load configured compactor '" + className + "'", e);
     }
-    className = conf.get(
+  }
+
+  protected void createCompactionPolicy(Configuration conf, Store store) throws IOException {
+    String className = conf.get(
         DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName());
     try {
       compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className,
@@ -79,7 +88,10 @@ public class DefaultStoreEngine extends StoreEngine<
     } catch (Exception e) {
       throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
     }
-    className = conf.get(
+  }
+
+  protected void createStoreFlusher(Configuration conf, Store store) throws IOException {
+    String className = conf.get(
         DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName());
     try {
       storeFlusher = ReflectionUtils.instantiateWithCustomCtor(className,

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
new file mode 100644
index 0000000..6b4e450
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -0,0 +1,268 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.NavigableSet;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobCacheConfig;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFile;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobStoreEngine;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The store implementation to save MOBs (medium objects), it extends the HStore.
+ * When a descriptor of a column family has the value "is_mob", it means this column family
+ * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is
+ * created.
+ * HMobStore is almost the same with the HStore except using different types of scanners.
+ * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned.
+ * In these scanners, a additional seeks in the mob files should be performed after the seek
+ * to HBase is done.
+ * The store implements how we save MOBs by extending HStore. When a descriptor
+ * of a column family has the value "isMob", it means this column family is a mob one. When a
+ * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is
+ * almost the same with the HStore except using different types of scanners. In the method of
+ * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a
+ * additional seeks in the mob files should be performed after the seek in HBase is done.
+ */
+@InterfaceAudience.Private
+public class HMobStore extends HStore {
+
+  private MobCacheConfig mobCacheConfig;
+  private Path homePath;
+  private Path mobFamilyPath;
+
+  public HMobStore(final HRegion region, final HColumnDescriptor family,
+      final Configuration confParam) throws IOException {
+    super(region, family, confParam);
+    this.mobCacheConfig = (MobCacheConfig) cacheConf;
+    this.homePath = MobUtils.getMobHome(conf);
+    this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
+        family.getNameAsString());
+  }
+
+  /**
+   * Creates the mob cache config.
+   */
+  @Override
+  protected void createCacheConf(HColumnDescriptor family) {
+    cacheConf = new MobCacheConfig(conf, family);
+  }
+
+  /**
+   * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
+   * the mob files should be performed after the seek in HBase is done.
+   */
+  @Override
+  protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
+      long readPt, KeyValueScanner scanner) throws IOException {
+    if (scanner == null) {
+      scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
+          targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
+    }
+    return scanner;
+  }
+
+  /**
+   * Creates the mob store engine.
+   */
+  @Override
+  protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
+      KVComparator kvComparator) throws IOException {
+    MobStoreEngine engine = new MobStoreEngine();
+    engine.createComponents(conf, store, kvComparator);
+    return engine;
+  }
+
+  /**
+   * Gets the temp directory.
+   * @return The temp directory.
+   */
+  private Path getTempDir() {
+    return new Path(homePath, MobConstants.TEMP_DIR_NAME);
+  }
+
+  /**
+   * Creates the temp directory of mob files for flushing.
+   * @param date The latest date of cells in the flushing.
+   * @param maxKeyCount The key count.
+   * @param compression The compression algorithm.
+   * @param startKey The start key.
+   * @return The writer for the mob file.
+   * @throws IOException
+   */
+  public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
+      Compression.Algorithm compression, byte[] startKey) throws IOException {
+    if (startKey == null) {
+      startKey = HConstants.EMPTY_START_ROW;
+    }
+    Path path = getTempDir();
+    return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
+  }
+
+  /**
+   * Creates the temp directory of mob files for flushing.
+   * @param date The date string, its format is yyyymmmdd.
+   * @param basePath The basic path for a temp directory.
+   * @param maxKeyCount The key count.
+   * @param compression The compression algorithm.
+   * @param startKey The start key.
+   * @return The writer for the mob file.
+   * @throws IOException
+   */
+  public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
+      Compression.Algorithm compression, byte[] startKey) throws IOException {
+    MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
+        .toString().replaceAll("-", ""));
+    final CacheConfig writerCacheConf = mobCacheConfig;
+    HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+        .withIncludesMvcc(false).withIncludesTags(true)
+        .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
+        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
+        .withBlockSize(getFamily().getBlocksize())
+        .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()).build();
+
+    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
+        .withFilePath(new Path(basePath, mobFileName.getFileName()))
+        .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
+        .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+    return w;
+  }
+
+  /**
+   * Commits the mob file.
+   * @param sourceFile The source file.
+   * @param targetPath The directory path where the source file is renamed to.
+   * @throws IOException
+   */
+  public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
+    if (sourceFile == null) {
+      return;
+    }
+    Path dstPath = new Path(targetPath, sourceFile.getName());
+    validateMobFile(sourceFile);
+    String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
+    LOG.info(msg);
+    Path parent = dstPath.getParent();
+    if (!region.getFilesystem().exists(parent)) {
+      region.getFilesystem().mkdirs(parent);
+    }
+    if (!region.getFilesystem().rename(sourceFile, dstPath)) {
+      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
+    }
+  }
+
+  /**
+   * Validates a mob file by opening and closing it.
+   *
+   * @param path the path to the mob file
+   */
+  private void validateMobFile(Path path) throws IOException {
+    StoreFile storeFile = null;
+    try {
+      storeFile =
+          new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
+      storeFile.createReader();
+    } catch (IOException e) {
+      LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
+      throw e;
+    } finally {
+      if (storeFile != null) {
+        storeFile.closeReader(false);
+      }
+    }
+  }
+
+  /**
+   * Reads the cell from the mob file.
+   * @param reference The cell found in the HBase, its value is a path to a mob file.
+   * @param cacheBlocks Whether the scanner should cache blocks.
+   * @return The cell found in the mob file.
+   * @throws IOException
+   */
+  public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
+    Cell result = null;
+    if (reference.getValueLength() > Bytes.SIZEOF_LONG) {
+      String fileName = Bytes.toString(reference.getValueArray(), reference.getValueOffset()
+          + Bytes.SIZEOF_LONG, reference.getValueLength() - Bytes.SIZEOF_LONG);
+      Path targetPath = new Path(mobFamilyPath, fileName);
+      MobFile file = null;
+      try {
+        file = mobCacheConfig.getMobFileCache().openFile(region.getFilesystem(), targetPath,
+            mobCacheConfig);
+        result = file.readCell(reference, cacheBlocks);
+      } catch (IOException e) {
+        LOG.error("Fail to open/read the mob file " + targetPath.toString(), e);
+      } catch (NullPointerException e) {
+        // When delete the file during the scan, the hdfs getBlockRange will
+        // throw NullPointerException, catch it and manage it.
+        LOG.error("Fail to read the mob file " + targetPath.toString()
+            + " since it's already deleted", e);
+      } finally {
+        if (file != null) {
+          mobCacheConfig.getMobFileCache().closeFile(file);
+        }
+      }
+    } else {
+      LOG.warn("Invalid reference to mob, " + reference.getValueLength() + " bytes is too short");
+    }
+
+    if (result == null) {
+      LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
+          + "qualifier,timestamp,type and tags but with an empty value to return.");
+      result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
+          reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(),
+          reference.getFamilyLength(), reference.getQualifierArray(),
+          reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(),
+          Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY,
+          0, 0, reference.getTagsArray(), reference.getTagsOffset(),
+          reference.getTagsLength());
+    }
+    return result;
+  }
+
+  /**
+   * Gets the mob file path.
+   * @return The mob file path.
+   */
+  public Path getPath() {
+    return mobFamilyPath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b879e8a..c87c12b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -3552,6 +3553,9 @@ public class HRegion implements HeapSize { // , Writable{
   }
 
   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
+    if (MobUtils.isMobFamily(family)) {
+      return new HMobStore(this, family, this.conf);
+    }
     return new HStore(this, family, this.conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 8d20d7b..45edf7f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Cipher;
@@ -132,11 +133,11 @@ public class HStore implements Store {
 
   protected final MemStore memstore;
   // This stores directory in the filesystem.
-  private final HRegion region;
+  protected final HRegion region;
   private final HColumnDescriptor family;
   private final HRegionFileSystem fs;
-  private final Configuration conf;
-  private final CacheConfig cacheConf;
+  protected final Configuration conf;
+  protected CacheConfig cacheConf;
   private long lastCompactSize = 0;
   volatile boolean forceMajor = false;
   /* how many bytes to write between status checks */
@@ -244,7 +245,7 @@ public class HStore implements Store {
     this.offPeakHours = OffPeakHours.getInstance(conf);
 
     // Setting up cache configuration for this family
-    this.cacheConf = new CacheConfig(conf, family);
+    createCacheConf(family);
 
     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
 
@@ -263,7 +264,7 @@ public class HStore implements Store {
           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
     }
 
-    this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
+    this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
     this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
 
     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
@@ -338,6 +339,27 @@ public class HStore implements Store {
   }
 
   /**
+   * Creates the cache config.
+   * @param family The current column family.
+   */
+  protected void createCacheConf(final HColumnDescriptor family) {
+    this.cacheConf = new CacheConfig(conf, family);
+  }
+
+  /**
+   * Creates the store engine configured for the given Store.
+   * @param store The store. An unfortunate dependency needed due to it
+   *              being passed to coprocessors via the compactor.
+   * @param conf Store configuration.
+   * @param kvComparator KVComparator for storeFileManager.
+   * @return StoreEngine to use.
+   */
+  protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
+      KVComparator kvComparator) throws IOException {
+    return StoreEngine.create(store, conf, comparator);
+  }
+
+  /**
    * @param family
    * @return TTL in seconds of the specified family
    */
@@ -1886,17 +1908,23 @@ public class HStore implements Store {
       if (this.getCoprocessorHost() != null) {
         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
       }
-      if (scanner == null) {
-        scanner = scan.isReversed() ? new ReversedStoreScanner(this,
-            getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
-            getScanInfo(), scan, targetCols, readPt);
-      }
+      scanner = createScanner(scan, targetCols, readPt, scanner);
       return scanner;
     } finally {
       lock.readLock().unlock();
     }
   }
 
+  protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
+      long readPt, KeyValueScanner scanner) throws IOException {
+    if (scanner == null) {
+      scanner = scan.isReversed() ? new ReversedStoreScanner(this,
+          getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
+          getScanInfo(), scan, targetCols, readPt);
+    }
+    return scanner;
+  }
+
   @Override
   public String toString() {
     return this.getColumnFamilyName();

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
new file mode 100644
index 0000000..b4bcbe7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into List<KeyValue>
+ * for a single row.
+ *
+ */
+@InterfaceAudience.Private
+public class MobStoreScanner extends StoreScanner {
+
+  private boolean cacheMobBlocks = false;
+
+  public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+      final NavigableSet<byte[]> columns, long readPt) throws IOException {
+    super(store, scanInfo, scan, columns, readPt);
+    cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
+  }
+
+  /**
+   * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
+   * reference tag), the scanner need seek this cell from the mob file, and use the cell found
+   * from the mob file as the result.
+   */
+  @Override
+  public boolean next(List<Cell> outResult, int limit) throws IOException {
+    boolean result = super.next(outResult, limit);
+    if (!MobUtils.isRawMobScan(scan)) {
+      // retrieve the mob data
+      if (outResult.isEmpty()) {
+        return result;
+      }
+      HMobStore mobStore = (HMobStore) store;
+      for (int i = 0; i < outResult.size(); i++) {
+        Cell cell = outResult.get(i);
+        if (MobUtils.isMobReferenceCell(cell)) {
+          outResult.set(i, mobStore.resolve(cell, cacheMobBlocks));
+        }
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
new file mode 100644
index 0000000..e384390
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
+ * reversed scanning in both the memstore and the MOB store.
+ *
+ */
+@InterfaceAudience.Private
+public class ReversedMobStoreScanner extends ReversedStoreScanner {
+
+  private boolean cacheMobBlocks = false;
+
+  ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
+      long readPt) throws IOException {
+    super(store, scanInfo, scan, columns, readPt);
+    cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
+  }
+
+  /**
+   * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
+   * reference tag), the scanner need seek this cell from the mob file, and use the cell found
+   * from the mob file as the result.
+   */
+  @Override
+  public boolean next(List<Cell> outResult, int limit) throws IOException {
+    boolean result = super.next(outResult, limit);
+    if (!MobUtils.isRawMobScan(scan)) {
+      // retrieve the mob data
+      if (outResult.isEmpty()) {
+        return result;
+      }
+      HMobStore mobStore = (HMobStore) store;
+      for (int i = 0; i < outResult.size(); i++) {
+        Cell cell = outResult.get(i);
+        if (MobUtils.isMobReferenceCell(cell)) {
+          outResult.set(i, mobStore.resolve(cell, cacheMobBlocks));
+        }
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 27c64f0..13ded58 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -915,7 +915,7 @@ public class StoreFile {
       return this.writer.getPath();
     }
 
-    boolean hasGeneralBloom() {
+    public boolean hasGeneralBloom() {
       return this.generalBloomFilterWriter != null;
     }