You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/09/04 14:43:55 UTC

[2/2] git commit: HBASE-11643 Read and write MOB in HBase (Jingcheng Du)

HBASE-11643 Read and write MOB in HBase (Jingcheng Du)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5c14f749
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5c14f749
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5c14f749

Branch: refs/heads/hbase-11339
Commit: 5c14f749b3196fe5d9e1efe6dd5d6d7356cc72d0
Parents: bcfc6d6
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Thu Sep 4 05:41:21 2014 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Thu Sep 4 05:41:21 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/TagType.java   |   4 +
 .../src/main/resources/hbase-default.xml        |  29 ++
 .../master/handler/DeleteTableHandler.java      |  30 ++
 .../apache/hadoop/hbase/mob/CachedMobFile.java  | 114 +++++
 .../hbase/mob/DefaultMobStoreFlusher.java       | 217 +++++++++
 .../apache/hadoop/hbase/mob/MobCacheConfig.java |  56 +++
 .../apache/hadoop/hbase/mob/MobConstants.java   |  61 +++
 .../org/apache/hadoop/hbase/mob/MobFile.java    | 140 ++++++
 .../apache/hadoop/hbase/mob/MobFileCache.java   | 270 +++++++++++
 .../apache/hadoop/hbase/mob/MobFileName.java    | 169 +++++++
 .../apache/hadoop/hbase/mob/MobStoreEngine.java |  40 ++
 .../org/apache/hadoop/hbase/mob/MobUtils.java   | 263 +++++++++++
 .../hbase/regionserver/DefaultStoreEngine.java  |  16 +-
 .../hadoop/hbase/regionserver/HMobStore.java    | 268 +++++++++++
 .../hadoop/hbase/regionserver/HRegion.java      |   4 +
 .../hadoop/hbase/regionserver/HStore.java       |  48 +-
 .../hbase/regionserver/MobStoreScanner.java     |  69 +++
 .../regionserver/ReversedMobStoreScanner.java   |  69 +++
 .../hadoop/hbase/regionserver/StoreFile.java    |   2 +-
 .../apache/hadoop/hbase/mob/MobTestUtil.java    |  86 ++++
 .../hadoop/hbase/mob/TestCachedMobFile.java     | 154 ++++++
 .../hbase/mob/TestDefaultMobStoreFlusher.java   | 193 ++++++++
 .../hbase/mob/TestMobDataBlockEncoding.java     | 141 ++++++
 .../apache/hadoop/hbase/mob/TestMobFile.java    | 124 +++++
 .../hadoop/hbase/mob/TestMobFileCache.java      | 206 ++++++++
 .../hadoop/hbase/mob/TestMobFileName.java       |  79 ++++
 .../hbase/regionserver/TestDeleteMobTable.java  | 225 +++++++++
 .../hbase/regionserver/TestHMobStore.java       | 471 +++++++++++++++++++
 .../hbase/regionserver/TestMobStoreScanner.java | 318 +++++++++++++
 29 files changed, 3853 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index a21f7de..8613a35 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -28,4 +28,8 @@ public final class TagType {
   public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
   public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
   public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
+
+  // mob tags
+  public static final byte MOB_REFERENCE_TAG_TYPE = (byte) 5;
+  public static final byte MOB_TABLE_NAME_TAG_TYPE = (byte) 6;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index c724c27..872bbc5 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1454,4 +1454,33 @@ possible configurations would overwhelm and obscure the important.
     <name>hbase.http.staticuser.user</name>
     <value>dr.stack</value>
   </property>
+  <!-- Mob properties. -->
+  <property>
+    <name>hbase.mob.file.cache.size</name>
+    <value>1000</value>
+    <description>
+      Number of opened file handlers to cache.
+      A larger value will benefit reads by providing more file handlers per mob
+      file cache and would reduce frequent file opening and closing.
+      However, if this is set too high, this could lead to a "too many opened file handlers"
+      The default value is 1000.
+    </description>
+  </property>
+  <property>
+    <name>hbase.mob.cache.evict.period</name>
+    <value>3600</value>
+    <description>
+      The amount of time in seconds before the mob cache evicts cached mob files.
+      The default value is 3600 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.mob.cache.evict.remain.ratio</name>
+    <value>0.5f</value>
+    <description>
+      The ratio (between 0.0 and 1.0) of files that remains cached after an eviction
+      is triggered when the number of cached mob files exceeds the hbase.mob.file.cache.size.
+      The default value is 0.5f.
+    </description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
index 730da73..b5cdae9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -43,6 +45,8 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
 
 @InterfaceAudience.Private
 public class DeleteTableHandler extends TableEventHandler {
@@ -152,10 +156,36 @@ public class DeleteTableHandler extends TableEventHandler {
           tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName()));
     }
 
+    // Archive the mob data if there is a mob-enabled column
+    HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies();
+    boolean hasMob = false;
+    for (HColumnDescriptor hcd : hcds) {
+      if (MobUtils.isMobFamily(hcd)) {
+        hasMob = true;
+        break;
+      }
+    }
+    Path mobTableDir = null;
+    if (hasMob) {
+      // Archive mob data
+      mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME),
+          tableName);
+      Path regionDir =
+          new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName());
+      if (fs.exists(regionDir)) {
+        HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir);
+      }
+    }
     // 4. Delete table directory from FS (temp directory)
     if (!fs.delete(tempTableDir, true)) {
       LOG.error("Couldn't delete " + tempTableDir);
     }
+    // Delete the table directory where the mob files are saved
+    if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) {
+      if (!fs.delete(mobTableDir, true)) {
+        LOG.error("Couldn't delete " + mobTableDir);
+      }
+    }
 
     LOG.debug("Table '" + tableName + "' archived!");
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
new file mode 100644
index 0000000..457eb6c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+/**
+ * Cached mob file.
+ */
+@InterfaceAudience.Private
+public class CachedMobFile extends MobFile implements Comparable<CachedMobFile> {
+
+  private long accessCount;
+  private AtomicLong referenceCount = new AtomicLong(0);
+
+  public CachedMobFile(StoreFile sf) {
+    super(sf);
+  }
+
+  public static CachedMobFile create(FileSystem fs, Path path, Configuration conf,
+      CacheConfig cacheConf) throws IOException {
+    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+    return new CachedMobFile(sf);
+  }
+
+  public void access(long accessCount) {
+    this.accessCount = accessCount;
+  }
+
+  public int compareTo(CachedMobFile that) {
+    if (this.accessCount == that.accessCount) return 0;
+    return this.accessCount < that.accessCount ? 1 : -1;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof CachedMobFile)) {
+      return false;
+    }
+    return compareTo((CachedMobFile) obj) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)(accessCount ^ (accessCount >>> 32));
+  }
+
+  /**
+   * Opens the mob file if it's not opened yet and increases the reference.
+   * It's not thread-safe. Use MobFileCache.openFile() instead.
+   * The reader of the mob file is just opened when it's not opened no matter how many times
+   * this open() method is invoked.
+   * The reference is a counter that how many times this reader is referenced. When the
+   * reference is 0, this reader is closed.
+   */
+  @Override
+  public void open() throws IOException {
+    super.open();
+    referenceCount.incrementAndGet();
+  }
+
+  /**
+   * Decreases the reference of the underlying reader for the mob file.
+   * It's not thread-safe. Use MobFileCache.closeFile() instead.
+   * This underlying reader isn't closed until the reference is 0.
+   */
+  @Override
+  public void close() throws IOException {
+    long refs = referenceCount.decrementAndGet();
+    if (refs == 0) {
+      super.close();
+    }
+  }
+
+  /**
+   * Gets the reference of the current mob file.
+   * Internal usage, currently it's for testing.
+   * @return The reference of the current mob file.
+   */
+  public long getReferenceCount() {
+    return this.referenceCount.longValue();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
new file mode 100644
index 0000000..4a036a7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -0,0 +1,217 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
+import org.apache.hadoop.hbase.regionserver.HMobStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
+ * If the store is not a mob store, the flusher flushes the MemStore the same with
+ * DefaultStoreFlusher,
+ * If the store is a mob store, the flusher flushes the MemStore into two places.
+ * One is the store files of HBase, the other is the mob files.
+ * <ol>
+ * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li>
+ * <li>If the size of a cell value is larger than a threshold, it'll be flushed
+ * to a mob file, another cell with the path of this file will be flushed to HBase.</li>
+ * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
+ * HBase directly.</li>
+ * </ol>
+ *
+ */
+@InterfaceAudience.Private
+public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
+
+  private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class);
+  private final Object flushLock = new Object();
+  private long mobCellValueSizeThreshold = 0;
+  private Path targetPath;
+  private HMobStore mobStore;
+
+  public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException {
+    super(conf, store);
+    mobCellValueSizeThreshold = MobUtils.getMobThreshold(store.getFamily());
+    this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
+        store.getColumnFamilyName());
+    if (!this.store.getFileSystem().exists(targetPath)) {
+      this.store.getFileSystem().mkdirs(targetPath);
+    }
+    this.mobStore = (HMobStore) store;
+  }
+
+  /**
+   * Flushes the snapshot of the MemStore.
+   * If this store is not a mob store, flush the cells in the snapshot to store files of HBase.
+   * If the store is a mob one, the flusher flushes the MemStore into two places.
+   * One is the store files of HBase, the other is the mob files.
+   * <ol>
+   * <li>Cells that are not PUT type or have the delete mark will be directly flushed to
+   * HBase.</li>
+   * <li>If the size of a cell value is larger than a threshold, it'll be
+   * flushed to a mob file, another cell with the path of this file will be flushed to HBase.</li>
+   * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
+   * HBase directly.</li>
+   * </ol>
+   */
+  @Override
+  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
+      MonitoredTask status) throws IOException {
+    ArrayList<Path> result = new ArrayList<Path>();
+    int cellsCount = snapshot.getCellsCount();
+    if (cellsCount == 0) return result; // don't flush if there are no entries
+
+    // Use a store scanner to find which rows to flush.
+    long smallestReadPoint = store.getSmallestReadPoint();
+    InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
+    if (scanner == null) {
+      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
+    }
+    StoreFile.Writer writer;
+    try {
+      // TODO: We can fail in the below block before we complete adding this flush to
+      // list of store files. Add cleanup of anything put on filesystem if we fail.
+      synchronized (flushLock) {
+        status.setStatus("Flushing " + store + ": creating writer");
+        // Write the map out to the disk
+        writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
+            false, true, true);
+        writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
+        try {
+          // It's a mob store, flush the cells in a mob way. This is the difference of flushing
+          // between a normal and a mob store.
+          performMobFlush(snapshot, cacheFlushId, scanner, writer, status);
+        } finally {
+          finalizeWriter(writer, cacheFlushId, status);
+        }
+      }
+    } finally {
+      scanner.close();
+    }
+    LOG.info("Flushed, sequenceid=" + cacheFlushId + ", memsize="
+        + snapshot.getSize() + ", hasBloomFilter=" + writer.hasGeneralBloom()
+        + ", into tmp file " + writer.getPath());
+    result.add(writer.getPath());
+    return result;
+  }
+
+  /**
+   * Flushes the cells in the mob store.
+   * <ol>In the mob store, the cells with PUT type might have or have no mob tags.
+   * <li>If a cell does not have a mob tag, flushing the cell to different files depends
+   * on the value length. If the length is larger than a threshold, it's flushed to a
+   * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly
+   * flush the cell to a store file in HBase.</li>
+   * <li>If a cell have a mob tag, its value is a mob file name, directly flush it
+   * to a store file in HBase.</li>
+   * </ol>
+   * @param snapshot Memstore snapshot.
+   * @param cacheFlushId Log cache flush sequence number.
+   * @param scanner The scanner of memstore snapshot.
+   * @param writer The store file writer.
+   * @param status Task that represents the flush operation and may be updated with status.
+   * @throws IOException
+   */
+  protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
+      InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException {
+    StoreFile.Writer mobFileWriter = null;
+    int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
+        HConstants.COMPACTION_KV_MAX_DEFAULT);
+    long mobKVCount = 0;
+    long time = snapshot.getTimeRangeTracker().getMaximumTimestamp();
+    mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
+        store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
+    // the target path is {tableName}/.mob/{cfName}/mobFiles
+    // the relative path is mobFiles
+    byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
+    try {
+      Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
+          .getName());
+      List<Cell> cells = new ArrayList<Cell>();
+      boolean hasMore;
+      do {
+        hasMore = scanner.next(cells, compactionKVMax);
+        if (!cells.isEmpty()) {
+          for (Cell c : cells) {
+            // If we know that this KV is going to be included always, then let us
+            // set its memstoreTS to 0. This will help us save space when writing to
+            // disk.
+            KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+            if (kv.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(kv)
+                || kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
+              writer.append(kv);
+            } else {
+              // append the original keyValue in the mob file.
+              mobFileWriter.append(kv);
+              mobKVCount++;
+
+              // append the tags to the KeyValue.
+              // The key is same, the value is the filename of the mob file
+              KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
+              writer.append(reference);
+            }
+          }
+          cells.clear();
+        }
+      } while (hasMore);
+    } finally {
+      status.setStatus("Flushing mob file " + store + ": appending metadata");
+      mobFileWriter.appendMetadata(cacheFlushId, false);
+      status.setStatus("Flushing mob file " + store + ": closing flushed file");
+      mobFileWriter.close();
+    }
+
+    if (mobKVCount > 0) {
+      // commit the mob file from temp folder to target folder.
+      // If the mob file is committed successfully but the store file is not,
+      // the committed mob file will be handled by the sweep tool as an unused
+      // file.
+      mobStore.commitFile(mobFileWriter.getPath(), targetPath);
+    } else {
+      try {
+        // If the mob file is empty, delete it instead of committing.
+        store.getFileSystem().delete(mobFileWriter.getPath(), true);
+      } catch (IOException e) {
+        LOG.error("Fail to delete the temp mob file", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java
new file mode 100644
index 0000000..b160010
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+
+/**
+ * The cache configuration for the mob.
+ */
+@InterfaceAudience.Private
+public class MobCacheConfig extends CacheConfig {
+
+  private static MobFileCache mobFileCache;
+
+  public MobCacheConfig(Configuration conf, HColumnDescriptor family) {
+    super(conf, family);
+    instantiateMobFileCache(conf);
+  }
+
+  /**
+   * Instantiates the MobFileCache.
+   * @param conf The current configuration.
+   */
+  public static synchronized void instantiateMobFileCache(Configuration conf) {
+    if (mobFileCache == null) {
+      mobFileCache = new MobFileCache(conf);
+    }
+  }
+
+  /**
+   * Gets the MobFileCache.
+   * @return The MobFileCache.
+   */
+  public MobFileCache getMobFileCache() {
+    return mobFileCache;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
new file mode 100644
index 0000000..d208722
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The constants used in mob.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MobConstants {
+
+  public static final byte[] IS_MOB = Bytes.toBytes("isMob");
+  public static final byte[] MOB_THRESHOLD = Bytes.toBytes("mobThreshold");
+  public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k
+
+  public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw";
+  public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks";
+
+  public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size";
+  public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000;
+
+  public static final String MOB_DIR_NAME = "mobdir";
+  public static final String MOB_REGION_NAME = ".mob";
+  public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME);
+
+  public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period";
+  public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio";
+  public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE,
+      HConstants.EMPTY_BYTE_ARRAY);
+
+  public static final float DEFAULT_EVICT_REMAIN_RATIO = 0.5f;
+  public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
+
+  public final static String TEMP_DIR_NAME = ".tmp";
+  private MobConstants() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
new file mode 100644
index 0000000..a120057
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
@@ -0,0 +1,140 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+
+/**
+ * The mob file.
+ */
+@InterfaceAudience.Private
+public class MobFile {
+
+  private StoreFile sf;
+
+  // internal use only for sub classes
+  protected MobFile() {
+  }
+
+  protected MobFile(StoreFile sf) {
+    this.sf = sf;
+  }
+
+  /**
+   * Internal use only. This is used by the sweeper.
+   *
+   * @return The store file scanner.
+   * @throws IOException
+   */
+  public StoreFileScanner getScanner() throws IOException {
+    List<StoreFile> sfs = new ArrayList<StoreFile>();
+    sfs.add(sf);
+    List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
+        false, null, sf.getMaxMemstoreTS());
+
+    return sfScanners.get(0);
+  }
+
+  /**
+   * Reads a cell from the mob file.
+   * @param search The cell need to be searched in the mob file.
+   * @param cacheMobBlocks Should this scanner cache blocks.
+   * @return The cell in the mob file.
+   * @throws IOException
+   */
+  public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
+    Cell result = null;
+    StoreFileScanner scanner = null;
+    List<StoreFile> sfs = new ArrayList<StoreFile>();
+    sfs.add(sf);
+    try {
+      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs,
+          cacheMobBlocks, true, false, null, sf.getMaxMemstoreTS());
+      if (!sfScanners.isEmpty()) {
+        scanner = sfScanners.get(0);
+        if (scanner.seek(search)) {
+          result = scanner.peek();
+        }
+      }
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Gets the file name.
+   * @return The file name.
+   */
+  public String getFileName() {
+    return sf.getPath().getName();
+  }
+
+  /**
+   * Opens the underlying reader.
+   * It's not thread-safe. Use MobFileCache.openFile() instead.
+   * @throws IOException
+   */
+  public void open() throws IOException {
+    if (sf.getReader() == null) {
+      sf.createReader();
+    }
+  }
+
+  /**
+   * Closes the underlying reader, but do no evict blocks belonging to this file.
+   * It's not thread-safe. Use MobFileCache.closeFile() instead.
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    if (sf != null) {
+      sf.closeReader(false);
+      sf = null;
+    }
+  }
+
+  /**
+   * Creates an instance of the MobFile.
+   * @param fs The file system.
+   * @param path The path of the underlying StoreFile.
+   * @param conf The configuration.
+   * @param cacheConf The CacheConfig.
+   * @return An instance of the MobFile.
+   * @throws IOException
+   */
+  public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf)
+      throws IOException {
+    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+    return new MobFile(sf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
new file mode 100644
index 0000000..97530b1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.IdLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The cache for mob files.
+ * This cache doesn't cache the mob file blocks. It only caches the references of mob files.
+ * We are doing this to avoid opening and closing mob files all the time. We just keep
+ * references open.
+ */
+@InterfaceAudience.Private
+public class MobFileCache {
+
+  private static final Log LOG = LogFactory.getLog(MobFileCache.class);
+
+  /*
+   * Eviction and statistics thread. Periodically run to print the statistics and
+   * evict the lru cached mob files when the count of the cached files is larger
+   * than the threshold.
+   */
+  static class EvictionThread extends Thread {
+    MobFileCache lru;
+
+    public EvictionThread(MobFileCache lru) {
+      super("MobFileCache.EvictionThread");
+      setDaemon(true);
+      this.lru = lru;
+    }
+
+    @Override
+    public void run() {
+      lru.evict();
+    }
+  }
+
+  // a ConcurrentHashMap, accesses to this map are synchronized.
+  private Map<String, CachedMobFile> map = null;
+  // caches access count
+  private final AtomicLong count;
+  private long lastAccess;
+  private final AtomicLong miss;
+  private long lastMiss;
+
+  // a lock to sync the evict to guarantee the eviction occurs in sequence.
+  // the method evictFile is not sync by this lock, the ConcurrentHashMap does the sync there.
+  private final ReentrantLock evictionLock = new ReentrantLock(true);
+
+  //stripes lock on each mob file based on its hash. Sync the openFile/closeFile operations.
+  private final IdLock keyLock = new IdLock();
+
+  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
+      new ThreadFactoryBuilder().setNameFormat("MobFileCache #%d").setDaemon(true).build());
+  private final Configuration conf;
+
+  // the count of the cached references to mob files
+  private final int mobFileMaxCacheSize;
+  private final boolean isCacheEnabled;
+  private float evictRemainRatio;
+
+  public MobFileCache(Configuration conf) {
+    this.conf = conf;
+    this.mobFileMaxCacheSize = conf.getInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY,
+        MobConstants.DEFAULT_MOB_FILE_CACHE_SIZE);
+    isCacheEnabled = (mobFileMaxCacheSize > 0);
+    map = new ConcurrentHashMap<String, CachedMobFile>(mobFileMaxCacheSize);
+    this.count = new AtomicLong(0);
+    this.miss = new AtomicLong(0);
+    this.lastAccess = 0;
+    this.lastMiss = 0;
+    if (isCacheEnabled) {
+      long period = conf.getLong(MobConstants.MOB_CACHE_EVICT_PERIOD,
+          MobConstants.DEFAULT_MOB_CACHE_EVICT_PERIOD); // in seconds
+      evictRemainRatio = conf.getFloat(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO,
+          MobConstants.DEFAULT_EVICT_REMAIN_RATIO);
+      if (evictRemainRatio < 0.0) {
+        evictRemainRatio = 0.0f;
+        LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is less than 0.0, 0.0 is used.");
+      } else if (evictRemainRatio > 1.0) {
+        evictRemainRatio = 1.0f;
+        LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is larger than 1.0, 1.0 is used.");
+      }
+      this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period,
+          TimeUnit.SECONDS);
+    }
+    LOG.info("MobFileCache is initialized, and the cache size is " + mobFileMaxCacheSize);
+  }
+
+  /**
+   * Evicts the lru cached mob files when the count of the cached files is larger
+   * than the threshold.
+   */
+  public void evict() {
+    if (isCacheEnabled) {
+      // Ensure only one eviction at a time
+      if (!evictionLock.tryLock()) {
+        return;
+      }
+      printStatistics();
+      List<CachedMobFile> evictedFiles = new ArrayList<CachedMobFile>();
+      try {
+        if (map.size() <= mobFileMaxCacheSize) {
+          return;
+        }
+        List<CachedMobFile> files = new ArrayList<CachedMobFile>(map.values());
+        Collections.sort(files);
+        int start = (int) (mobFileMaxCacheSize * evictRemainRatio);
+        if (start >= 0) {
+          for (int i = start; i < files.size(); i++) {
+            String name = files.get(i).getFileName();
+            CachedMobFile evictedFile = map.remove(name);
+            if (evictedFile != null) {
+              evictedFiles.add(evictedFile);
+            }
+          }
+        }
+      } finally {
+        evictionLock.unlock();
+      }
+      // EvictionLock is released. Close the evicted files one by one.
+      // The closes are sync in the closeFile method.
+      for (CachedMobFile evictedFile : evictedFiles) {
+        closeFile(evictedFile);
+      }
+    }
+  }
+
+  /**
+   * Evicts the cached file by the name.
+   * @param fileName The name of a cached file.
+   */
+  public void evictFile(String fileName) {
+    if (isCacheEnabled) {
+      IdLock.Entry lockEntry = null;
+      try {
+        // obtains the lock to close the cached file.
+        lockEntry = keyLock.getLockEntry(fileName.hashCode());
+        CachedMobFile evictedFile = map.remove(fileName);
+        if (evictedFile != null) {
+          evictedFile.close();
+        }
+      } catch (IOException e) {
+        LOG.error("Fail to evict the file " + fileName, e);
+      } finally {
+        if (lockEntry != null) {
+          keyLock.releaseLockEntry(lockEntry);
+        }
+      }
+    }
+  }
+
+  /**
+   * Opens a mob file.
+   * @param fs The current file system.
+   * @param path The file path.
+   * @param cacheConf The current MobCacheConfig
+   * @return A opened mob file.
+   * @throws IOException
+   */
+  public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException {
+    if (!isCacheEnabled) {
+      return MobFile.create(fs, path, conf, cacheConf);
+    } else {
+      String fileName = path.getName();
+      CachedMobFile cached = map.get(fileName);
+      IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode());
+      try {
+        if (cached == null) {
+          cached = map.get(fileName);
+          if (cached == null) {
+            if (map.size() > mobFileMaxCacheSize) {
+              evict();
+            }
+            cached = CachedMobFile.create(fs, path, conf, cacheConf);
+            cached.open();
+            map.put(fileName, cached);
+            miss.incrementAndGet();
+          }
+        }
+        cached.open();
+        cached.access(count.incrementAndGet());
+      } finally {
+        keyLock.releaseLockEntry(lockEntry);
+      }
+      return cached;
+    }
+  }
+
+  /**
+   * Closes a mob file.
+   * @param file The mob file that needs to be closed.
+   */
+  public void closeFile(MobFile file) {
+    IdLock.Entry lockEntry = null;
+    try {
+      if (!isCacheEnabled) {
+        file.close();
+      } else {
+        lockEntry = keyLock.getLockEntry(file.getFileName().hashCode());
+        file.close();
+      }
+    } catch (IOException e) {
+      LOG.error("MobFileCache, Exception happen during close " + file.getFileName(), e);
+    } finally {
+      if (lockEntry != null) {
+        keyLock.releaseLockEntry(lockEntry);
+      }
+    }
+  }
+
+  /**
+   * Gets the count of cached mob files.
+   * @return The count of the cached mob files.
+   */
+  public int getCacheSize() {
+    return map == null ? 0 : map.size();
+  }
+
+  /**
+   * Prints the statistics.
+   */
+  public void printStatistics() {
+    long access = count.get() - lastAccess;
+    long missed = miss.get() - lastMiss;
+    long hitRate = (access - missed) * 100 / access;
+    LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: "
+        + (access - missed) + ", hit rate: "
+        + ((access == 0) ? 0 : hitRate) + "%");
+    lastAccess += access;
+    lastMiss += missed;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
new file mode 100644
index 0000000..937e965
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
@@ -0,0 +1,169 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.MD5Hash;
+
+/**
+ * The mob file name.
+ * It consists of a md5 of a start key, a date and an uuid.
+ * It looks like md5(start) + date + uuid.
+ * <ol>
+ * <li>0-31 characters: md5 hex string of a start key. Since the length of the start key is not
+ * fixed, have to use the md5 instead which has a fix length.</li>
+ * <li>32-39 characters: a string of a date with format yyyymmdd. The date is the latest timestamp
+ * of cells in this file</li>
+ * <li>the remaining characters: the uuid.</li>
+ * </ol>
+ * Using md5 hex string of start key as the prefix of file name makes files with the same start
+ * key unique, they're different from the ones with other start keys
+ * The cells come from different regions might be in the same mob file by region split,
+ * this is allowed.
+ * Has the latest timestamp of cells in the file name in order to clean the expired mob files by
+ * TTL easily. If this timestamp is older than the TTL, it's regarded as expired.
+ */
+@InterfaceAudience.Private
+public class MobFileName {
+
+  private final String date;
+  private final String startKey;
+  private final String uuid;
+  private final String fileName;
+
+  /**
+   * @param startKey
+   *          The start key.
+   * @param date
+   *          The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+   * @param uuid
+   *          The uuid
+   */
+  private MobFileName(byte[] startKey, String date, String uuid) {
+    this.startKey = MD5Hash.getMD5AsHex(startKey, 0, startKey.length);
+    this.uuid = uuid;
+    this.date = date;
+    this.fileName = this.startKey + date + uuid;
+  }
+
+  /**
+   * @param startKey
+   *          The md5 hex string of the start key.
+   * @param date
+   *          The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+   * @param uuid
+   *          The uuid
+   */
+  private MobFileName(String startKey, String date, String uuid) {
+    this.startKey = startKey;
+    this.uuid = uuid;
+    this.date = date;
+    this.fileName = this.startKey + date + uuid;
+  }
+
+  /**
+   * Creates an instance of MobFileName
+   *
+   * @param startKey
+   *          The start key.
+   * @param date
+   *          The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+   * @param uuid The uuid.
+   * @return An instance of a MobFileName.
+   */
+  public static MobFileName create(byte[] startKey, String date, String uuid) {
+    return new MobFileName(startKey, date, uuid);
+  }
+
+  /**
+   * Creates an instance of MobFileName
+   *
+   * @param startKey
+   *          The md5 hex string of the start key.
+   * @param date
+   *          The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+   * @param uuid The uuid.
+   * @return An instance of a MobFileName.
+   */
+  public static MobFileName create(String startKey, String date, String uuid) {
+    return new MobFileName(startKey, date, uuid);
+  }
+
+  /**
+   * Creates an instance of MobFileName.
+   * @param fileName The string format of a file name.
+   * @return An instance of a MobFileName.
+   */
+  public static MobFileName create(String fileName) {
+    // The format of a file name is md5HexString(0-31bytes) + date(32-39bytes) + UUID
+    // The date format is yyyyMMdd
+    String startKey = fileName.substring(0, 32);
+    String date = fileName.substring(32, 40);
+    String uuid = fileName.substring(40);
+    return new MobFileName(startKey, date, uuid);
+  }
+
+  /**
+   * Gets the hex string of the md5 for a start key.
+   * @return The hex string of the md5 for a start key.
+   */
+  public String getStartKey() {
+    return startKey;
+  }
+
+  /**
+   * Gets the date string. Its format is yyyymmdd.
+   * @return The date string.
+   */
+  public String getDate() {
+    return this.date;
+  }
+
+  @Override
+  public int hashCode() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(startKey);
+    builder.append(date);
+    builder.append(uuid);
+    return builder.toString().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object anObject) {
+    if (this == anObject) {
+      return true;
+    }
+    if (anObject instanceof MobFileName) {
+      MobFileName another = (MobFileName) anObject;
+      if (this.startKey.equals(another.startKey) && this.date.equals(another.date)
+          && this.uuid.equals(another.uuid)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Gets the file name.
+   * @return The file name.
+   */
+  public String getFileName() {
+    return this.fileName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
new file mode 100644
index 0000000..d5e6f2e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+/**
+ * MobStoreEngine creates the mob specific compactor, and store flusher.
+ */
+@InterfaceAudience.Private
+public class MobStoreEngine extends DefaultStoreEngine {
+
+  @Override
+  protected void createStoreFlusher(Configuration conf, Store store) throws IOException {
+    // When using MOB, we use DefaultMobStoreFlusher always
+    // Just use the compactor and compaction policy as that in DefaultStoreEngine. We can have MOB
+    // specific compactor and policy when that is implemented.
+    storeFlusher = new DefaultMobStoreFlusher(conf, store);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
new file mode 100644
index 0000000..149eed4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -0,0 +1,263 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * The mob utilities
+ */
+@InterfaceAudience.Private
+public class MobUtils {
+
+  private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
+      new ThreadLocal<SimpleDateFormat>() {
+    @Override
+    protected SimpleDateFormat initialValue() {
+      return new SimpleDateFormat("yyyyMMdd");
+    }
+  };
+
+  /**
+   * Indicates whether the column family is a mob one.
+   * @param hcd The descriptor of a column family.
+   * @return True if this column family is a mob one, false if it's not.
+   */
+  public static boolean isMobFamily(HColumnDescriptor hcd) {
+    byte[] isMob = hcd.getValue(MobConstants.IS_MOB);
+    return isMob != null && isMob.length == 1 && Bytes.toBoolean(isMob);
+  }
+
+  /**
+   * Gets the mob threshold.
+   * If the size of a cell value is larger than this threshold, it's regarded as a mob.
+   * The default threshold is 1024*100(100K)B.
+   * @param hcd The descriptor of a column family.
+   * @return The threshold.
+   */
+  public static long getMobThreshold(HColumnDescriptor hcd) {
+    byte[] threshold = hcd.getValue(MobConstants.MOB_THRESHOLD);
+    return threshold != null && threshold.length == Bytes.SIZEOF_LONG ? Bytes.toLong(threshold)
+        : MobConstants.DEFAULT_MOB_THRESHOLD;
+  }
+
+  /**
+   * Formats a date to a string.
+   * @param date The date.
+   * @return The string format of the date, it's yyyymmdd.
+   */
+  public static String formatDate(Date date) {
+    return LOCAL_FORMAT.get().format(date);
+  }
+
+  /**
+   * Parses the string to a date.
+   * @param dateString The string format of a date, it's yyyymmdd.
+   * @return A date.
+   * @throws ParseException
+   */
+  public static Date parseDate(String dateString) throws ParseException {
+    return LOCAL_FORMAT.get().parse(dateString);
+  }
+
+  /**
+   * Whether the current cell is a mob reference cell.
+   * @param cell The current cell.
+   * @return True if the cell has a mob reference tag, false if it doesn't.
+   */
+  public static boolean isMobReferenceCell(Cell cell) {
+    if (cell.getTagsLength() > 0) {
+      Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
+          TagType.MOB_REFERENCE_TAG_TYPE);
+      return tag != null;
+    }
+    return false;
+  }
+
+  /**
+   * Whether the tag list has a mob reference tag.
+   * @param tags The tag list.
+   * @return True if the list has a mob reference tag, false if it doesn't.
+   */
+  public static boolean hasMobReferenceTag(List<Tag> tags) {
+    if (!tags.isEmpty()) {
+      for (Tag tag : tags) {
+        if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Indicates whether it's a raw scan.
+   * The information is set in the attribute "hbase.mob.scan.raw" of scan.
+   * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
+   * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
+   * the mob file.
+   * @param scan The current scan.
+   * @return True if it's a raw scan.
+   */
+  public static boolean isRawMobScan(Scan scan) {
+    byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
+    try {
+      return raw != null && Bytes.toBoolean(raw);
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Indicates whether the scan contains the information of caching blocks.
+   * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
+   * @param scan The current scan.
+   * @return True when the Scan attribute specifies to cache the MOB blocks.
+   */
+  public static boolean isCacheMobBlocks(Scan scan) {
+    byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
+    try {
+      return cache != null && Bytes.toBoolean(cache);
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Sets the attribute of caching blocks in the scan.
+   *
+   * @param scan
+   *          The current scan.
+   * @param cacheBlocks
+   *          True, set the attribute of caching blocks into the scan, the scanner with this scan
+   *          caches blocks.
+   *          False, the scanner doesn't cache blocks for this scan.
+   */
+  public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
+    scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
+  }
+
+  /**
+   * Gets the root dir of the mob files.
+   * It's {HBASE_DIR}/mobdir.
+   * @param conf The current configuration.
+   * @return the root dir of the mob file.
+   */
+  public static Path getMobHome(Configuration conf) {
+    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
+    return new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
+  }
+
+  /**
+   * Gets the region dir of the mob files.
+   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
+   * @param conf The current configuration.
+   * @param tableName The current table name.
+   * @return The region dir of the mob files.
+   */
+  public static Path getMobRegionPath(Configuration conf, TableName tableName) {
+    Path tablePath = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
+    HRegionInfo regionInfo = getMobRegionInfo(tableName);
+    return new Path(tablePath, regionInfo.getEncodedName());
+  }
+
+  /**
+   * Gets the family dir of the mob files.
+   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
+   * @param conf The current configuration.
+   * @param tableName The current table name.
+   * @param familyName The current family name.
+   * @return The family dir of the mob files.
+   */
+  public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
+    return new Path(getMobRegionPath(conf, tableName), familyName);
+  }
+
+  /**
+   * Gets the family dir of the mob files.
+   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
+   * @param regionPath The path of mob region which is a dummy one.
+   * @param familyName The current family name.
+   * @return The family dir of the mob files.
+   */
+  public static Path getMobFamilyPath(Path regionPath, String familyName) {
+    return new Path(regionPath, familyName);
+  }
+
+  /**
+   * Gets the HRegionInfo of the mob files.
+   * This is a dummy region. The mob files are not saved in a region in HBase.
+   * This is only used in mob snapshot. It's internally used only.
+   * @param tableName
+   * @return A dummy mob region info.
+   */
+  public static HRegionInfo getMobRegionInfo(TableName tableName) {
+    HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES,
+        HConstants.EMPTY_END_ROW, false, 0);
+    return info;
+  }
+
+  /**
+   * Creates a mob reference KeyValue.
+   * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
+   * @param kv The original KeyValue.
+   * @param fileName The mob file name where the mob reference KeyValue is written.
+   * @param tableNameTag The tag of the current table name. It's very important in
+   *                        cloning the snapshot.
+   * @return The mob reference KeyValue.
+   */
+  public static KeyValue createMobRefKeyValue(KeyValue kv, byte[] fileName, Tag tableNameTag) {
+    // Append the tags to the KeyValue.
+    // The key is same, the value is the filename of the mob file
+    List<Tag> existingTags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
+    existingTags.add(MobConstants.MOB_REF_TAG);
+    // Add the tag of the source table name, this table is where this mob file is flushed
+    // from.
+    // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
+    // find the original mob files by this table name. For details please see cloning
+    // snapshot for mob files.
+    existingTags.add(tableNameTag);
+    long valueLength = kv.getValueLength();
+    byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
+    KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+        kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
+        kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put,
+        refValue, 0, refValue.length, existingTags);
+    reference.setSequenceId(kv.getSequenceId());
+    return reference;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index f2a0b06..4afa80c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@ -63,6 +63,12 @@ public class DefaultStoreEngine extends StoreEngine<
   protected void createComponents(
       Configuration conf, Store store, KVComparator kvComparator) throws IOException {
     storeFileManager = new DefaultStoreFileManager(kvComparator, conf);
+    createCompactor(conf, store);
+    createCompactionPolicy(conf, store);
+    createStoreFlusher(conf, store);
+  }
+
+  protected void createCompactor(Configuration conf, Store store) throws IOException {
     String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName());
     try {
       compactor = ReflectionUtils.instantiateWithCustomCtor(className,
@@ -70,7 +76,10 @@ public class DefaultStoreEngine extends StoreEngine<
     } catch (Exception e) {
       throw new IOException("Unable to load configured compactor '" + className + "'", e);
     }
-    className = conf.get(
+  }
+
+  protected void createCompactionPolicy(Configuration conf, Store store) throws IOException {
+    String className = conf.get(
         DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName());
     try {
       compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className,
@@ -79,7 +88,10 @@ public class DefaultStoreEngine extends StoreEngine<
     } catch (Exception e) {
       throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
     }
-    className = conf.get(
+  }
+
+  protected void createStoreFlusher(Configuration conf, Store store) throws IOException {
+    String className = conf.get(
         DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName());
     try {
       storeFlusher = ReflectionUtils.instantiateWithCustomCtor(className,

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
new file mode 100644
index 0000000..6b4e450
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -0,0 +1,268 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.NavigableSet;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobCacheConfig;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFile;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobStoreEngine;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The store implementation to save MOBs (medium objects), it extends the HStore.
+ * When a descriptor of a column family has the value "is_mob", it means this column family
+ * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is
+ * created.
+ * HMobStore is almost the same with the HStore except using different types of scanners.
+ * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned.
+ * In these scanners, a additional seeks in the mob files should be performed after the seek
+ * to HBase is done.
+ * The store implements how we save MOBs by extending HStore. When a descriptor
+ * of a column family has the value "isMob", it means this column family is a mob one. When a
+ * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is
+ * almost the same with the HStore except using different types of scanners. In the method of
+ * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a
+ * additional seeks in the mob files should be performed after the seek in HBase is done.
+ */
+@InterfaceAudience.Private
+public class HMobStore extends HStore {
+
+  private MobCacheConfig mobCacheConfig;
+  private Path homePath;
+  private Path mobFamilyPath;
+
+  public HMobStore(final HRegion region, final HColumnDescriptor family,
+      final Configuration confParam) throws IOException {
+    super(region, family, confParam);
+    this.mobCacheConfig = (MobCacheConfig) cacheConf;
+    this.homePath = MobUtils.getMobHome(conf);
+    this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
+        family.getNameAsString());
+  }
+
+  /**
+   * Creates the mob cache config.
+   */
+  @Override
+  protected void createCacheConf(HColumnDescriptor family) {
+    cacheConf = new MobCacheConfig(conf, family);
+  }
+
+  /**
+   * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
+   * the mob files should be performed after the seek in HBase is done.
+   */
+  @Override
+  protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
+      long readPt, KeyValueScanner scanner) throws IOException {
+    if (scanner == null) {
+      scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
+          targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
+    }
+    return scanner;
+  }
+
+  /**
+   * Creates the mob store engine.
+   */
+  @Override
+  protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
+      KVComparator kvComparator) throws IOException {
+    MobStoreEngine engine = new MobStoreEngine();
+    engine.createComponents(conf, store, kvComparator);
+    return engine;
+  }
+
+  /**
+   * Gets the temp directory.
+   * @return The temp directory.
+   */
+  private Path getTempDir() {
+    return new Path(homePath, MobConstants.TEMP_DIR_NAME);
+  }
+
+  /**
+   * Creates the temp directory of mob files for flushing.
+   * @param date The latest date of cells in the flushing.
+   * @param maxKeyCount The key count.
+   * @param compression The compression algorithm.
+   * @param startKey The start key.
+   * @return The writer for the mob file.
+   * @throws IOException
+   */
+  public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
+      Compression.Algorithm compression, byte[] startKey) throws IOException {
+    if (startKey == null) {
+      startKey = HConstants.EMPTY_START_ROW;
+    }
+    Path path = getTempDir();
+    return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
+  }
+
+  /**
+   * Creates the temp directory of mob files for flushing.
+   * @param date The date string, its format is yyyymmmdd.
+   * @param basePath The basic path for a temp directory.
+   * @param maxKeyCount The key count.
+   * @param compression The compression algorithm.
+   * @param startKey The start key.
+   * @return The writer for the mob file.
+   * @throws IOException
+   */
+  public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
+      Compression.Algorithm compression, byte[] startKey) throws IOException {
+    MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
+        .toString().replaceAll("-", ""));
+    final CacheConfig writerCacheConf = mobCacheConfig;
+    HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+        .withIncludesMvcc(false).withIncludesTags(true)
+        .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
+        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
+        .withBlockSize(getFamily().getBlocksize())
+        .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()).build();
+
+    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
+        .withFilePath(new Path(basePath, mobFileName.getFileName()))
+        .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
+        .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+    return w;
+  }
+
+  /**
+   * Commits the mob file.
+   * @param sourceFile The source file.
+   * @param targetPath The directory path where the source file is renamed to.
+   * @throws IOException
+   */
+  public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
+    if (sourceFile == null) {
+      return;
+    }
+    Path dstPath = new Path(targetPath, sourceFile.getName());
+    validateMobFile(sourceFile);
+    String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
+    LOG.info(msg);
+    Path parent = dstPath.getParent();
+    if (!region.getFilesystem().exists(parent)) {
+      region.getFilesystem().mkdirs(parent);
+    }
+    if (!region.getFilesystem().rename(sourceFile, dstPath)) {
+      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
+    }
+  }
+
+  /**
+   * Validates a mob file by opening and closing it.
+   *
+   * @param path the path to the mob file
+   */
+  private void validateMobFile(Path path) throws IOException {
+    StoreFile storeFile = null;
+    try {
+      storeFile =
+          new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
+      storeFile.createReader();
+    } catch (IOException e) {
+      LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
+      throw e;
+    } finally {
+      if (storeFile != null) {
+        storeFile.closeReader(false);
+      }
+    }
+  }
+
+  /**
+   * Reads the cell from the mob file.
+   * @param reference The cell found in the HBase, its value is a path to a mob file.
+   * @param cacheBlocks Whether the scanner should cache blocks.
+   * @return The cell found in the mob file.
+   * @throws IOException
+   */
+  public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
+    Cell result = null;
+    if (reference.getValueLength() > Bytes.SIZEOF_LONG) {
+      String fileName = Bytes.toString(reference.getValueArray(), reference.getValueOffset()
+          + Bytes.SIZEOF_LONG, reference.getValueLength() - Bytes.SIZEOF_LONG);
+      Path targetPath = new Path(mobFamilyPath, fileName);
+      MobFile file = null;
+      try {
+        file = mobCacheConfig.getMobFileCache().openFile(region.getFilesystem(), targetPath,
+            mobCacheConfig);
+        result = file.readCell(reference, cacheBlocks);
+      } catch (IOException e) {
+        LOG.error("Fail to open/read the mob file " + targetPath.toString(), e);
+      } catch (NullPointerException e) {
+        // When delete the file during the scan, the hdfs getBlockRange will
+        // throw NullPointerException, catch it and manage it.
+        LOG.error("Fail to read the mob file " + targetPath.toString()
+            + " since it's already deleted", e);
+      } finally {
+        if (file != null) {
+          mobCacheConfig.getMobFileCache().closeFile(file);
+        }
+      }
+    } else {
+      LOG.warn("Invalid reference to mob, " + reference.getValueLength() + " bytes is too short");
+    }
+
+    if (result == null) {
+      LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
+          + "qualifier,timestamp,type and tags but with an empty value to return.");
+      result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
+          reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(),
+          reference.getFamilyLength(), reference.getQualifierArray(),
+          reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(),
+          Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY,
+          0, 0, reference.getTagsArray(), reference.getTagsOffset(),
+          reference.getTagsLength());
+    }
+    return result;
+  }
+
+  /**
+   * Gets the mob file path.
+   * @return The mob file path.
+   */
+  public Path getPath() {
+    return mobFamilyPath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b879e8a..c87c12b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -3552,6 +3553,9 @@ public class HRegion implements HeapSize { // , Writable{
   }
 
   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
+    if (MobUtils.isMobFamily(family)) {
+      return new HMobStore(this, family, this.conf);
+    }
     return new HStore(this, family, this.conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 8d20d7b..45edf7f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Cipher;
@@ -132,11 +133,11 @@ public class HStore implements Store {
 
   protected final MemStore memstore;
   // This stores directory in the filesystem.
-  private final HRegion region;
+  protected final HRegion region;
   private final HColumnDescriptor family;
   private final HRegionFileSystem fs;
-  private final Configuration conf;
-  private final CacheConfig cacheConf;
+  protected final Configuration conf;
+  protected CacheConfig cacheConf;
   private long lastCompactSize = 0;
   volatile boolean forceMajor = false;
   /* how many bytes to write between status checks */
@@ -244,7 +245,7 @@ public class HStore implements Store {
     this.offPeakHours = OffPeakHours.getInstance(conf);
 
     // Setting up cache configuration for this family
-    this.cacheConf = new CacheConfig(conf, family);
+    createCacheConf(family);
 
     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
 
@@ -263,7 +264,7 @@ public class HStore implements Store {
           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
     }
 
-    this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
+    this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
     this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
 
     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
@@ -338,6 +339,27 @@ public class HStore implements Store {
   }
 
   /**
+   * Creates the cache config.
+   * @param family The current column family.
+   */
+  protected void createCacheConf(final HColumnDescriptor family) {
+    this.cacheConf = new CacheConfig(conf, family);
+  }
+
+  /**
+   * Creates the store engine configured for the given Store.
+   * @param store The store. An unfortunate dependency needed due to it
+   *              being passed to coprocessors via the compactor.
+   * @param conf Store configuration.
+   * @param kvComparator KVComparator for storeFileManager.
+   * @return StoreEngine to use.
+   */
+  protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
+      KVComparator kvComparator) throws IOException {
+    return StoreEngine.create(store, conf, comparator);
+  }
+
+  /**
    * @param family
    * @return TTL in seconds of the specified family
    */
@@ -1886,17 +1908,23 @@ public class HStore implements Store {
       if (this.getCoprocessorHost() != null) {
         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
       }
-      if (scanner == null) {
-        scanner = scan.isReversed() ? new ReversedStoreScanner(this,
-            getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
-            getScanInfo(), scan, targetCols, readPt);
-      }
+      scanner = createScanner(scan, targetCols, readPt, scanner);
       return scanner;
     } finally {
       lock.readLock().unlock();
     }
   }
 
+  protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
+      long readPt, KeyValueScanner scanner) throws IOException {
+    if (scanner == null) {
+      scanner = scan.isReversed() ? new ReversedStoreScanner(this,
+          getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
+          getScanInfo(), scan, targetCols, readPt);
+    }
+    return scanner;
+  }
+
   @Override
   public String toString() {
     return this.getColumnFamilyName();

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
new file mode 100644
index 0000000..b4bcbe7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into List<KeyValue>
+ * for a single row.
+ *
+ */
+@InterfaceAudience.Private
+public class MobStoreScanner extends StoreScanner {
+
+  private boolean cacheMobBlocks = false;
+
+  public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+      final NavigableSet<byte[]> columns, long readPt) throws IOException {
+    super(store, scanInfo, scan, columns, readPt);
+    cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
+  }
+
+  /**
+   * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
+   * reference tag), the scanner need seek this cell from the mob file, and use the cell found
+   * from the mob file as the result.
+   */
+  @Override
+  public boolean next(List<Cell> outResult, int limit) throws IOException {
+    boolean result = super.next(outResult, limit);
+    if (!MobUtils.isRawMobScan(scan)) {
+      // retrieve the mob data
+      if (outResult.isEmpty()) {
+        return result;
+      }
+      HMobStore mobStore = (HMobStore) store;
+      for (int i = 0; i < outResult.size(); i++) {
+        Cell cell = outResult.get(i);
+        if (MobUtils.isMobReferenceCell(cell)) {
+          outResult.set(i, mobStore.resolve(cell, cacheMobBlocks));
+        }
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
new file mode 100644
index 0000000..e384390
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
+ * reversed scanning in both the memstore and the MOB store.
+ *
+ */
+@InterfaceAudience.Private
+public class ReversedMobStoreScanner extends ReversedStoreScanner {
+
+  private boolean cacheMobBlocks = false;
+
+  ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
+      long readPt) throws IOException {
+    super(store, scanInfo, scan, columns, readPt);
+    cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
+  }
+
+  /**
+   * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
+   * reference tag), the scanner need seek this cell from the mob file, and use the cell found
+   * from the mob file as the result.
+   */
+  @Override
+  public boolean next(List<Cell> outResult, int limit) throws IOException {
+    boolean result = super.next(outResult, limit);
+    if (!MobUtils.isRawMobScan(scan)) {
+      // retrieve the mob data
+      if (outResult.isEmpty()) {
+        return result;
+      }
+      HMobStore mobStore = (HMobStore) store;
+      for (int i = 0; i < outResult.size(); i++) {
+        Cell cell = outResult.get(i);
+        if (MobUtils.isMobReferenceCell(cell)) {
+          outResult.set(i, mobStore.resolve(cell, cacheMobBlocks));
+        }
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5c14f749/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 27c64f0..13ded58 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -915,7 +915,7 @@ public class StoreFile {
       return this.writer.getPath();
     }
 
-    boolean hasGeneralBloom() {
+    public boolean hasGeneralBloom() {
       return this.generalBloomFilterWriter != null;
     }