You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/02/22 21:56:20 UTC

[47/50] [abbrv] hbase git commit: Merge branch 'master' (2/11/15) into hbase-11339

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 3c8fa87,0000000..aba81eb
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@@ -1,546 -1,0 +1,548 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Date;
 +import java.util.List;
 +import java.util.NavigableSet;
 +import java.util.UUID;
 +
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValue.KVComparator;
 +import org.apache.hadoop.hbase.KeyValue.Type;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.Tag;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.filter.Filter;
 +import org.apache.hadoop.hbase.filter.FilterList;
 +import org.apache.hadoop.hbase.io.compress.Compression;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.io.hfile.HFile;
 +import org.apache.hadoop.hbase.io.hfile.HFileContext;
 +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 +import org.apache.hadoop.hbase.master.TableLockManager;
 +import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 +import org.apache.hadoop.hbase.mob.MobCacheConfig;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobFile;
 +import org.apache.hadoop.hbase.mob.MobFileName;
 +import org.apache.hadoop.hbase.mob.MobStoreEngine;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
++import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 +
 +/**
 + * The store implementation to save MOBs (medium objects), it extends the HStore.
 + * When a descriptor of a column family has the value "IS_MOB", it means this column family
 + * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is
 + * created.
 + * HMobStore is almost the same with the HStore except using different types of scanners.
 + * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned.
 + * In these scanners, a additional seeks in the mob files should be performed after the seek
 + * to HBase is done.
 + * The store implements how we save MOBs by extending HStore. When a descriptor
 + * of a column family has the value "IS_MOB", it means this column family is a mob one. When a
 + * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is
 + * almost the same with the HStore except using different types of scanners. In the method of
 + * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a
 + * additional seeks in the mob files should be performed after the seek in HBase is done.
 + */
 +@InterfaceAudience.Private
 +public class HMobStore extends HStore {
 +
 +  private MobCacheConfig mobCacheConfig;
 +  private Path homePath;
 +  private Path mobFamilyPath;
 +  private volatile long mobCompactedIntoMobCellsCount = 0;
 +  private volatile long mobCompactedFromMobCellsCount = 0;
 +  private volatile long mobCompactedIntoMobCellsSize = 0;
 +  private volatile long mobCompactedFromMobCellsSize = 0;
 +  private volatile long mobFlushCount = 0;
 +  private volatile long mobFlushedCellsCount = 0;
 +  private volatile long mobFlushedCellsSize = 0;
 +  private volatile long mobScanCellsCount = 0;
 +  private volatile long mobScanCellsSize = 0;
 +  private List<Path> mobDirLocations;
 +  private HColumnDescriptor family;
 +  private TableLockManager tableLockManager;
 +  private TableName tableLockName;
 +
 +  public HMobStore(final HRegion region, final HColumnDescriptor family,
 +      final Configuration confParam) throws IOException {
 +    super(region, family, confParam);
 +    this.family = family;
 +    this.mobCacheConfig = (MobCacheConfig) cacheConf;
 +    this.homePath = MobUtils.getMobHome(conf);
 +    this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
 +        family.getNameAsString());
 +    mobDirLocations = new ArrayList<Path>();
 +    mobDirLocations.add(mobFamilyPath);
 +    TableName tn = region.getTableDesc().getTableName();
 +    mobDirLocations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
 +        .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
 +    if (region.getRegionServerServices() != null) {
 +      tableLockManager = region.getRegionServerServices().getTableLockManager();
 +      tableLockName = MobUtils.getTableLockName(getTableName());
 +    }
 +  }
 +
 +  /**
 +   * Creates the mob cache config.
 +   */
 +  @Override
 +  protected void createCacheConf(HColumnDescriptor family) {
 +    cacheConf = new MobCacheConfig(conf, family);
 +  }
 +
 +  /**
 +   * Gets current config.
 +   */
 +  public Configuration getConfiguration() {
 +    return this.conf;
 +  }
 +
 +  /**
 +   * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
 +   * the mob files should be performed after the seek in HBase is done.
 +   */
 +  @Override
 +  protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
 +      long readPt, KeyValueScanner scanner) throws IOException {
 +    if (scanner == null) {
 +      if (MobUtils.isRefOnlyScan(scan)) {
 +        Filter refOnlyFilter = new MobReferenceOnlyFilter();
 +        Filter filter = scan.getFilter();
 +        if (filter != null) {
 +          scan.setFilter(new FilterList(filter, refOnlyFilter));
 +        } else {
 +          scan.setFilter(refOnlyFilter);
 +        }
 +      }
 +      scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
 +          targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
 +    }
 +    return scanner;
 +  }
 +
 +  /**
 +   * Creates the mob store engine.
 +   */
 +  @Override
 +  protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
 +      KVComparator kvComparator) throws IOException {
 +    MobStoreEngine engine = new MobStoreEngine();
 +    engine.createComponents(conf, store, kvComparator);
 +    return engine;
 +  }
 +
 +  /**
 +   * Gets the temp directory.
 +   * @return The temp directory.
 +   */
 +  private Path getTempDir() {
 +    return new Path(homePath, MobConstants.TEMP_DIR_NAME);
 +  }
 +
 +  /**
 +   * Creates the writer for the mob file in temp directory.
 +   * @param date The latest date of written cells.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @param startKey The start key.
 +   * @return The writer for the mob file.
 +   * @throws IOException
 +   */
 +  public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
 +      Compression.Algorithm compression, byte[] startKey) throws IOException {
 +    if (startKey == null) {
 +      startKey = HConstants.EMPTY_START_ROW;
 +    }
 +    Path path = getTempDir();
 +    return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
 +  }
 +
 +  /**
 +   * Creates the writer for the del file in temp directory.
 +   * The del file keeps tracking the delete markers. Its name has a suffix _del,
 +   * the format is [0-9a-f]+(_del)?.
 +   * @param date The latest date of written cells.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @param startKey The start key.
 +   * @return The writer for the del file.
 +   * @throws IOException
 +   */
 +  public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount,
 +      Compression.Algorithm compression, byte[] startKey) throws IOException {
 +    if (startKey == null) {
 +      startKey = HConstants.EMPTY_START_ROW;
 +    }
 +    Path path = getTempDir();
 +    String suffix = UUID
 +        .randomUUID().toString().replaceAll("-", "") + "_del";
 +    MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix);
 +    return createWriterInTmp(mobFileName, path, maxKeyCount, compression);
 +  }
 +
 +  /**
 +   * Creates the writer for the mob file in temp directory.
 +   * @param date The date string, its format is yyyymmmdd.
 +   * @param basePath The basic path for a temp directory.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @param startKey The start key.
 +   * @return The writer for the mob file.
 +   * @throws IOException
 +   */
 +  public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
 +      Compression.Algorithm compression, byte[] startKey) throws IOException {
 +    MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
 +        .toString().replaceAll("-", ""));
 +    return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression);
 +  }
 +
 +  /**
 +   * Creates the writer for the mob file in temp directory.
 +   * @param mobFileName The mob file name.
 +   * @param basePath The basic path for a temp directory.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @return The writer for the mob file.
 +   * @throws IOException
 +   */
 +  public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
 +      Compression.Algorithm compression) throws IOException {
 +    final CacheConfig writerCacheConf = mobCacheConfig;
 +    HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
 +        .withIncludesMvcc(false).withIncludesTags(true)
 +        .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
 +        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
 +        .withBlockSize(getFamily().getBlocksize())
 +        .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()).build();
 +
 +    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
 +        .withFilePath(new Path(basePath, mobFileName.getFileName()))
 +        .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
 +        .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
 +    return w;
 +  }
 +
 +  /**
 +   * Commits the mob file.
 +   * @param sourceFile The source file.
 +   * @param targetPath The directory path where the source file is renamed to.
 +   * @throws IOException
 +   */
 +  public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
 +    if (sourceFile == null) {
 +      return;
 +    }
 +    Path dstPath = new Path(targetPath, sourceFile.getName());
 +    validateMobFile(sourceFile);
 +    String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
 +    LOG.info(msg);
 +    Path parent = dstPath.getParent();
 +    if (!region.getFilesystem().exists(parent)) {
 +      region.getFilesystem().mkdirs(parent);
 +    }
 +    if (!region.getFilesystem().rename(sourceFile, dstPath)) {
 +      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
 +    }
 +  }
 +
 +  /**
 +   * Validates a mob file by opening and closing it.
 +   *
 +   * @param path the path to the mob file
 +   */
 +  private void validateMobFile(Path path) throws IOException {
 +    StoreFile storeFile = null;
 +    try {
 +      storeFile =
 +          new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
 +      storeFile.createReader();
 +    } catch (IOException e) {
 +      LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
 +      throw e;
 +    } finally {
 +      if (storeFile != null) {
 +        storeFile.closeReader(false);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Reads the cell from the mob file.
 +   * @param reference The cell found in the HBase, its value is a path to a mob file.
 +   * @param cacheBlocks Whether the scanner should cache blocks.
 +   * @return The cell found in the mob file.
 +   * @throws IOException
 +   */
 +  public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
 +    Cell result = null;
 +    if (MobUtils.hasValidMobRefCellValue(reference)) {
 +      String fileName = MobUtils.getMobFileName(reference);
 +      result = readCell(mobDirLocations, fileName, reference, cacheBlocks);
 +      if (result == null) {
 +        result = readClonedCell(fileName, reference, cacheBlocks);
 +      }
 +    }
 +    if (result == null) {
 +      LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
 +          + "qualifier,timestamp,type and tags but with an empty value to return.");
 +      result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
 +          reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(),
 +          reference.getFamilyLength(), reference.getQualifierArray(),
 +          reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(),
 +          Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY,
 +          0, 0, reference.getTagsArray(), reference.getTagsOffset(),
 +          reference.getTagsLength());
 +    }
 +    return result;
 +  }
 +
 +  /**
 +   * Reads the cell from a mob file.
 +   * The mob file might be located in different directories.
 +   * 1. The working directory.
 +   * 2. The archive directory.
 +   * Reads the cell from the files located in both of the above directories.
 +   * @param locations The possible locations where the mob files are saved.
 +   * @param fileName The file to be read.
 +   * @param search The cell to be searched.
 +   * @param cacheMobBlocks Whether the scanner should cache blocks.
 +   * @return The found cell. Null if there's no such a cell.
 +   * @throws IOException
 +   */
 +  private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks)
 +      throws IOException {
 +    FileSystem fs = getFileSystem();
 +    for (Path location : locations) {
 +      MobFile file = null;
 +      Path path = new Path(location, fileName);
 +      try {
 +        file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig);
 +        return file.readCell(search, cacheMobBlocks);
 +      } catch (IOException e) {
 +        mobCacheConfig.getMobFileCache().evictFile(fileName);
 +        if (e instanceof FileNotFoundException) {
 +          LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e);
 +        } else {
 +          throw e;
 +        }
 +      } finally {
 +        if (file != null) {
 +          mobCacheConfig.getMobFileCache().closeFile(file);
 +        }
 +      }
 +    }
 +    LOG.error("The mob file " + fileName + " could not be found in the locations "
 +        + mobDirLocations);
 +    return null;
 +  }
 +
 +  /**
 +   * Reads the cell from a mob file of source table.
 +   * The table might be cloned, in this case only hfile link is created in the new table,
 +   * and the mob file is located in the source table directories.
 +   * 1. The working directory of the source table.
 +   * 2. The archive directory of the source table.
 +   * Reads the cell from the files located in both of the above directories.
 +   * @param fileName The file to be read.
 +   * @param search The cell to be searched.
 +   * @param cacheMobBlocks Whether the scanner should cache blocks.
 +   * @return The found cell. Null if there's no such a cell.
 +   * @throws IOException
 +   */
 +  private Cell readClonedCell(String fileName, Cell search, boolean cacheMobBlocks)
 +      throws IOException {
 +    Tag tableNameTag = MobUtils.getTableNameTag(search);
 +    if (tableNameTag == null) {
 +      return null;
 +    }
 +    byte[] tableName = tableNameTag.getValue();
 +    if (Bytes.equals(this.getTableName().getName(), tableName)) {
 +      return null;
 +    }
 +    // the possible locations in the source table.
 +    List<Path> locations = new ArrayList<Path>();
 +    TableName tn = TableName.valueOf(tableName);
 +    locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
 +    locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn)
 +        .getEncodedName(), family.getNameAsString()));
 +    // read the cell from the source table.
 +    return readCell(locations, fileName, search, cacheMobBlocks);
 +  }
 +
 +  /**
 +   * Gets the mob file path.
 +   * @return The mob file path.
 +   */
 +  public Path getPath() {
 +    return mobFamilyPath;
 +  }
 +
 +  /**
 +   * The compaction in the store of mob.
 +   * The cells in this store contains the path of the mob files. There might be race
 +   * condition between the major compaction and the sweeping in mob files.
 +   * In order to avoid this, we need mutually exclude the running of the major compaction and
 +   * sweeping in mob files.
 +   * The minor compaction is not affected.
 +   * The major compaction is marked as retainDeleteMarkers when a sweeping is in progress.
 +   */
 +  @Override
-   public List<StoreFile> compact(CompactionContext compaction) throws IOException {
++  public List<StoreFile> compact(CompactionContext compaction,
++      CompactionThroughputController throughputController) throws IOException {
 +    // If it's major compaction, try to find whether there's a sweeper is running
 +    // If yes, mark the major compaction as retainDeleteMarkers
 +    if (compaction.getRequest().isAllFiles()) {
 +      // Use the Zookeeper to coordinate.
 +      // 1. Acquire a operation lock.
 +      //   1.1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction.
 +      //   1.2. If the lock is obtained, search the node of sweeping.
 +      //      1.2.1. If the node is there, the sweeping is in progress, mark the major
 +      //             compaction as retainDeleteMarkers and continue the compaction.
 +      //      1.2.2. If the node is not there, add a child to the major compaction node, and
 +      //             run the compaction directly.
 +      TableLock lock = null;
 +      if (tableLockManager != null) {
 +        lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore");
 +      }
 +      boolean tableLocked = false;
 +      String tableName = getTableName().getNameAsString();
 +      if (lock != null) {
 +        try {
 +          LOG.info("Start to acquire a read lock for the table[" + tableName
 +              + "], ready to perform the major compaction");
 +          lock.acquire();
 +          tableLocked = true;
 +        } catch (Exception e) {
 +          LOG.error("Fail to lock the table " + tableName, e);
 +        }
 +      } else {
 +        // If the tableLockManager is null, mark the tableLocked as true.
 +        tableLocked = true;
 +      }
 +      try {
 +        if (!tableLocked) {
 +          LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table["
 +              + tableName + "], forcing the delete markers to be retained");
 +          compaction.getRequest().forceRetainDeleteMarkers();
 +        }
-         return super.compact(compaction);
++        return super.compact(compaction, throughputController);
 +      } finally {
 +        if (tableLocked && lock != null) {
 +          try {
 +            lock.release();
 +          } catch (IOException e) {
 +            LOG.error("Fail to release the table lock " + tableName, e);
 +          }
 +        }
 +      }
 +    } else {
 +      // If it's not a major compaction, continue the compaction.
-       return super.compact(compaction);
++      return super.compact(compaction, throughputController);
 +    }
 +  }
 +
 +  public void updateMobCompactedIntoMobCellsCount(long count) {
 +    mobCompactedIntoMobCellsCount += count;
 +  }
 +
 +  public long getMobCompactedIntoMobCellsCount() {
 +    return mobCompactedIntoMobCellsCount;
 +  }
 +
 +  public void updateMobCompactedFromMobCellsCount(long count) {
 +    mobCompactedFromMobCellsCount += count;
 +  }
 +
 +  public long getMobCompactedFromMobCellsCount() {
 +    return mobCompactedFromMobCellsCount;
 +  }
 +
 +  public void updateMobCompactedIntoMobCellsSize(long size) {
 +    mobCompactedIntoMobCellsSize += size;
 +  }
 +
 +  public long getMobCompactedIntoMobCellsSize() {
 +    return mobCompactedIntoMobCellsSize;
 +  }
 +
 +  public void updateMobCompactedFromMobCellsSize(long size) {
 +    mobCompactedFromMobCellsSize += size;
 +  }
 +
 +  public long getMobCompactedFromMobCellsSize() {
 +    return mobCompactedFromMobCellsSize;
 +  }
 +
 +  public void updateMobFlushCount() {
 +    mobFlushCount++;
 +  }
 +
 +  public long getMobFlushCount() {
 +    return mobFlushCount;
 +  }
 +
 +  public void updateMobFlushedCellsCount(long count) {
 +    mobFlushedCellsCount += count;
 +  }
 +
 +  public long getMobFlushedCellsCount() {
 +    return mobFlushedCellsCount;
 +  }
 +
 +  public void updateMobFlushedCellsSize(long size) {
 +    mobFlushedCellsSize += size;
 +  }
 +
 +  public long getMobFlushedCellsSize() {
 +    return mobFlushedCellsSize;
 +  }
 +
 +  public void updateMobScanCellsCount(long count) {
 +    mobScanCellsCount += count;
 +  }
 +
 +  public long getMobScanCellsCount() {
 +    return mobScanCellsCount;
 +  }
 +
 +  public void updateMobScanCellsSize(long size) {
 +    mobScanCellsSize += size;
 +  }
 +
 +  public long getMobScanCellsSize() {
 +    return mobScanCellsSize;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index df5c900,53e732a..ab0165d
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@@ -33,9 -34,9 +34,10 @@@ import java.util.HashSet
  import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
 +import java.util.Map.Entry;
  import java.util.NavigableMap;
  import java.util.NavigableSet;
+ import java.util.RandomAccess;
  import java.util.Set;
  import java.util.TreeMap;
  import java.util.concurrent.Callable;

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index e11aac2,c170a65..787828b
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@@ -347,9 -372,7 +373,9 @@@ public class HRegionServer extends HasT
    private final RegionServerAccounting regionServerAccounting;
  
    // Cache configuration and block cache reference
-   final CacheConfig cacheConfig;
+   protected CacheConfig cacheConfig;
 +  // Cache configuration for mob
 +  final MobCacheConfig mobCacheConfig;
  
    /** The health check chore. */
    private HealthCheckChore healthCheckChore;
@@@ -831,10 -933,9 +938,10 @@@
        }
      }
      // Send cache a shutdown.
-     if (cacheConfig.isBlockCacheEnabled()) {
+     if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
        cacheConfig.getBlockCache().shutdown();
      }
 +    mobCacheConfig.getMobFileCache().shutdown();
  
      if (movedRegionsCleaner != null) {
        movedRegionsCleaner.stop("Region Server stopping");

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index b9f4038,252e5e1..f5bb67a
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@@ -55,10 -55,12 +55,13 @@@ import org.apache.hadoop.hbase.HColumnD
  import org.apache.hadoop.hbase.HConstants;
  import org.apache.hadoop.hbase.HRegionInfo;
  import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.KeyValueUtil;
  import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.KeyValue.KVComparator;
+ import org.apache.hadoop.hbase.Tag;
+ import org.apache.hadoop.hbase.TagType;
+ import org.apache.hadoop.hbase.classification.InterfaceAudience;
  import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.conf.ConfigurationManager;
  import org.apache.hadoop.hbase.io.compress.Compression;
  import org.apache.hadoop.hbase.io.crypto.Cipher;
  import org.apache.hadoop.hbase.io.crypto.Encryption;
@@@ -133,11 -137,11 +138,11 @@@ public class HStore implements Store 
  
    protected final MemStore memstore;
    // This stores directory in the filesystem.
 -  private final HRegion region;
 +  protected final HRegion region;
    private final HColumnDescriptor family;
    private final HRegionFileSystem fs;
-   protected final Configuration conf;
 -  private Configuration conf;
 -  private final CacheConfig cacheConf;
++  protected Configuration conf;
 +  protected CacheConfig cacheConf;
    private long lastCompactSize = 0;
    volatile boolean forceMajor = false;
    /* how many bytes to write between status checks */

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 4384d87,5e5590d..159ec55
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@@ -32,10 -33,11 +33,13 @@@ import org.apache.hadoop.hbase.ServerNa
  import org.apache.hadoop.hbase.io.hfile.BlockCache;
  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
  import org.apache.hadoop.hbase.io.hfile.CacheStats;
 +import org.apache.hadoop.hbase.mob.MobCacheConfig;
 +import org.apache.hadoop.hbase.mob.MobFileCache;
+ import org.apache.hadoop.hbase.wal.DefaultWALProvider;
  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+ import org.apache.hadoop.hbase.util.FSUtils;
  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
  import org.apache.hadoop.metrics2.MetricsExecutor;
  
  /**
@@@ -50,11 -52,10 +54,11 @@@ class MetricsRegionServerWrapperImp
    private final HRegionServer regionServer;
  
    private BlockCache blockCache;
 +  private MobFileCache mobFileCache;
  
    private volatile long numStores = 0;
-   private volatile long numHLogFiles = 0;
-   private volatile long hlogFileSize = 0;
+   private volatile long numWALFiles = 0;
+   private volatile long walFileSize = 0;
    private volatile long numStoreFiles = 0;
    private volatile long memstoreSize = 0;
    private volatile long storeFileSize = 0;
@@@ -75,20 -76,7 +79,21 @@@
    private volatile long flushedCellsSize = 0;
    private volatile long compactedCellsSize = 0;
    private volatile long majorCompactedCellsSize = 0;
 +  private volatile long mobCompactedIntoMobCellsCount = 0;
 +  private volatile long mobCompactedFromMobCellsCount = 0;
 +  private volatile long mobCompactedIntoMobCellsSize = 0;
 +  private volatile long mobCompactedFromMobCellsSize = 0;
 +  private volatile long mobFlushCount = 0;
 +  private volatile long mobFlushedCellsCount = 0;
 +  private volatile long mobFlushedCellsSize = 0;
 +  private volatile long mobScanCellsCount = 0;
 +  private volatile long mobScanCellsSize = 0;
 +  private volatile long mobFileCacheAccessCount = 0;
 +  private volatile long mobFileCacheMissCount = 0;
 +  private volatile double mobFileCacheHitRatio = 0;
 +  private volatile long mobFileCacheEvictedCount = 0;
 +  private volatile long mobFileCacheCount = 0;
+   private volatile long blockedRequestsCount = 0L;
  
    private CacheStats cacheStats;
    private ScheduledExecutorService executor;
@@@ -526,15 -450,7 +549,16 @@@
        long tempFlushedCellsSize = 0;
        long tempCompactedCellsSize = 0;
        long tempMajorCompactedCellsSize = 0;
 +      long tempMobCompactedIntoMobCellsCount = 0;
 +      long tempMobCompactedFromMobCellsCount = 0;
 +      long tempMobCompactedIntoMobCellsSize = 0;
 +      long tempMobCompactedFromMobCellsSize = 0;
 +      long tempMobFlushCount = 0;
 +      long tempMobFlushedCellsCount = 0;
 +      long tempMobFlushedCellsSize = 0;
 +      long tempMobScanCellsCount = 0;
 +      long tempMobScanCellsSize = 0;
+       long tempBlockedRequestsCount = 0L;
  
        for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
          tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get();
@@@ -631,20 -526,22 +646,36 @@@
        flushedCellsSize = tempFlushedCellsSize;
        compactedCellsSize = tempCompactedCellsSize;
        majorCompactedCellsSize = tempMajorCompactedCellsSize;
 +      mobCompactedIntoMobCellsCount = tempMobCompactedIntoMobCellsCount;
 +      mobCompactedFromMobCellsCount = tempMobCompactedFromMobCellsCount;
 +      mobCompactedIntoMobCellsSize = tempMobCompactedIntoMobCellsSize;
 +      mobCompactedFromMobCellsSize = tempMobCompactedFromMobCellsSize;
 +      mobFlushCount = tempMobFlushCount;
 +      mobFlushedCellsCount = tempMobFlushedCellsCount;
 +      mobFlushedCellsSize = tempMobFlushedCellsSize;
 +      mobScanCellsCount = tempMobScanCellsCount;
 +      mobScanCellsSize = tempMobScanCellsSize;
 +      mobFileCacheAccessCount = mobFileCache.getAccessCount();
 +      mobFileCacheMissCount = mobFileCache.getMissCount();
 +      mobFileCacheHitRatio = mobFileCache.getHitRatio();
 +      mobFileCacheEvictedCount = mobFileCache.getEvictedFileCount();
 +      mobFileCacheCount = mobFileCache.getCacheSize();
+       blockedRequestsCount = tempBlockedRequestsCount;
      }
    }
+ 
+   @Override
+   public long getHedgedReadOps() {
+     return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadOps();
+   }
+ 
+   @Override
+   public long getHedgedReadWins() {
+     return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins();
+   }
+ 
+   @Override
+   public long getBlockedRequestsCount() {
+     return blockedRequestsCount;
+   }
  }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 13967c2,a92c17e..7870040
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@@ -43,10 -43,9 +43,12 @@@ import org.apache.hadoop.hbase.regionse
  import org.apache.hadoop.hbase.regionserver.StoreFile;
  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
  import org.apache.hadoop.hbase.regionserver.StoreScanner;
 +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
  import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.Writables;
 +import org.apache.hadoop.util.StringUtils;
+ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
  
  /**
   * A compactor is a compaction algorithm associated a given policy. Base class also contains
@@@ -226,64 -220,87 +233,90 @@@ public abstract class Compactor 
      return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
    }
  
++  // TODO mob introduced the fd parameter; can we make this cleaner and easier to extend in future?
    /**
     * Performs the compaction.
-    * @param fd File details
++   * @param fd FileDetails of cell sink writer
     * @param scanner Where to read from.
     * @param writer Where to write to.
     * @param smallestReadPoint Smallest read point.
     * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
 +   * @param major Is a major compaction.
     * @return Whether compaction ended; false if it was interrupted for some reason.
     */
 -  protected boolean performCompaction(InternalScanner scanner, CellSink writer,
 +  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
-       long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
-     int bytesWritten = 0;
+       long smallestReadPoint, boolean cleanSeqId,
 -      CompactionThroughputController throughputController) throws IOException {
++      CompactionThroughputController throughputController, boolean major) throws IOException {
+     long bytesWritten = 0;
+     long bytesWrittenProgress = 0;
      // Since scanner.next() can return 'false' but still be delivering data,
      // we have to use a do/while loop.
-     List<Cell> kvs = new ArrayList<Cell>();
-     int closeCheckInterval = HStore.getCloseCheckInterval();
-     long lastMillis;
+     List<Cell> cells = new ArrayList<Cell>();
+     long closeCheckInterval = HStore.getCloseCheckInterval();
+     long lastMillis = 0;
      if (LOG.isDebugEnabled()) {
-       lastMillis = System.currentTimeMillis();
-     } else {
-       lastMillis = 0;
+       lastMillis = EnvironmentEdgeManager.currentTime();
      }
+     String compactionName =
+         store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString();
+     long now = 0;
      boolean hasMore;
-     do {
-       hasMore = scanner.next(kvs, compactionKVMax);
-       // output to writer:
-       for (Cell c : kvs) {
-         KeyValue kv = KeyValueUtil.ensureKeyValue(c);
-         resetSeqId(smallestReadPoint, cleanSeqId, kv);
-         writer.append(kv);
-         ++progress.currentCompactedKVs;
-         progress.totalCompactedSize += kv.getLength();
- 
-         // check periodically to see if a system stop is requested
-         if (closeCheckInterval > 0) {
-           bytesWritten += kv.getLength();
-           if (bytesWritten > closeCheckInterval) {
-             // Log the progress of long running compactions every minute if
-             // logging at DEBUG level
-             if (LOG.isDebugEnabled()) {
-               long now = System.currentTimeMillis();
-               if ((now - lastMillis) >= 60 * 1000) {
-                 LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec",
-                   (bytesWritten / 1024.0) / ((now - lastMillis) / 1000.0)));
-                 lastMillis = now;
+     throughputController.start(compactionName);
+     try {
+       do {
+         hasMore = scanner.next(cells, compactionKVMax);
+         if (LOG.isDebugEnabled()) {
+           now = EnvironmentEdgeManager.currentTime();
+         }
+         // output to writer:
+         for (Cell c : cells) {
+           if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
+             CellUtil.setSequenceId(c, 0);
+           }
+           writer.append(c);
+           int len = KeyValueUtil.length(c);
+           ++progress.currentCompactedKVs;
+           progress.totalCompactedSize += len;
+           if (LOG.isDebugEnabled()) {
+             bytesWrittenProgress += len;
+           }
+           throughputController.control(compactionName, len);
+           // check periodically to see if a system stop is requested
+           if (closeCheckInterval > 0) {
+             bytesWritten += len;
+             if (bytesWritten > closeCheckInterval) {
+               bytesWritten = 0;
+               if (!store.areWritesEnabled()) {
+                 progress.cancel();
+                 return false;
                }
              }
-             bytesWritten = 0;
-             if (!store.areWritesEnabled()) {
-               progress.cancel();
-               return false;
-             }
            }
          }
-       }
-       kvs.clear();
-     } while (hasMore);
+         // Log the progress of long running compactions every minute if
+         // logging at DEBUG level
+         if (LOG.isDebugEnabled()) {
+           if ((now - lastMillis) >= 60 * 1000) {
+             LOG.debug("Compaction progress: "
+                 + compactionName
+                 + " "
+                 + progress
+                 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
+                     / ((now - lastMillis) / 1000.0)) + ", throughputController is "
+                 + throughputController);
+             lastMillis = now;
+             bytesWrittenProgress = 0;
+           }
+         }
+         cells.clear();
+       } while (hasMore);
+     } catch (InterruptedException e) {
+       progress.cancel();
+       throw new InterruptedIOException("Interrupted while control throughput of compacting "
+           + compactionName);
+     } finally {
+       throughputController.finish(compactionName);
+     }
      progress.complete();
      return true;
    }
@@@ -321,29 -338,4 +354,17 @@@
      return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
          earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
    }
 +
 +  /**
-    * Resets the sequence id.
-    * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
-    * @param cleanSeqId Should clean the sequence id.
-    * @param kv The current KeyValue.
-    */
-   protected void resetSeqId(long smallestReadPoint, boolean cleanSeqId, KeyValue kv) {
-     if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
-       kv.setSequenceId(0);
-     }
-   }
- 
-   /**
 +   * Appends the metadata and closes the writer.
 +   * @param writer The current store writer.
 +   * @param fd The file details.
 +   * @param isMajor Is a major compaction.
 +   * @throws IOException
 +   */
 +  protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
 +      boolean isMajor) throws IOException {
 +    writer.appendMetadata(fd.maxSeqId, isMajor);
 +    writer.close();
 +  }
  }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index be859c0,5d712c1..090be8c
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@@ -76,9 -98,11 +98,10 @@@ public class DefaultCompactor extends C
            smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
            cleanSeqId = true;
          }
 -        
 -        writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
 -            true, fd.maxTagsLength > 0);
 +        writer = createTmpWriter(fd, smallestReadPoint);
-         boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
-             request.isAllFiles());
+         boolean finished =
 -            performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController);
++            performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, throughputController,
++                    request.isAllFiles());
          if (!finished) {
            writer.close();
            store.getFileSystem().delete(writer.getPath(), false);
@@@ -102,22 -146,8 +145,22 @@@
    }
  
    /**
 +   * Creates a writer for a new file in a temporary directory.
 +   * @param fd The file details.
 +   * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
 +   * @return Writer for a new StoreFile in the tmp dir.
 +   * @throws IOException
 +   */
 +  protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint)
 +      throws IOException {
 +    StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
 +        true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
 +    return writer;
 +  }
 +
 +  /**
     * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
-    * {@link #compact(CompactionRequest)};
+    * {@link #compact(CompactionRequest, CompactionThroughputController)};
     * @param filesToCompact the files to compact. These are used as the compactionSelection for
     *          the generated {@link CompactionRequest}.
     * @param isMajor true to major compact (prune all deletes, max versions, etc)

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 3109015,b957e16..fab4c2f
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@@ -127,8 -130,8 +130,9 @@@ public class StripeCompactor extends Co
        // It is ok here if storeScanner is null.
        StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
        mw.init(storeScanner, factory, store.getComparator());
-       finished = performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId,
-           request.isMajor());
+       finished =
 -          performCompaction(scanner, mw, smallestReadPoint, cleanSeqId, throughputController);
++          performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId, throughputController,
++                  request.isMajor());
        if (!finished) {
          throw new InterruptedIOException( "Aborting compaction of store " + store +
              " in region " + store.getRegionInfo().getRegionNameAsString() +

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
index 0d7efe7,2655e2b..841bc04
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
@@@ -52,21 -51,16 +51,20 @@@ import org.apache.hadoop.hbase.HConstan
  import org.apache.hadoop.hbase.HRegionInfo;
  import org.apache.hadoop.hbase.io.FileLink;
  import org.apache.hadoop.hbase.io.HFileLink;
- import org.apache.hadoop.hbase.io.HLogLink;
 +import org.apache.hadoop.hbase.io.hfile.HFile;
 +import org.apache.hadoop.hbase.mapreduce.JobUtil;
+ import org.apache.hadoop.hbase.io.WALLink;
  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 +import org.apache.hadoop.hbase.mob.MobUtils;
  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
- import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.util.HFileArchiveUtil;
  import org.apache.hadoop.hbase.util.Pair;
  import org.apache.hadoop.io.BytesWritable;
+ import org.apache.hadoop.io.IOUtils;
  import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.SequenceFile;
  import org.apache.hadoop.io.Writable;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.JobContext;
@@@ -423,10 -433,10 +437,10 @@@ public class ExportSnapshot extends Con
          switch (fileInfo.getType()) {
            case HFILE:
              Path inputPath = new Path(fileInfo.getHfile());
 -            link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
 +            link = getFileLink(inputPath, conf);
              break;
            case WAL:
-             link = new HLogLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
+             link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
              break;
            default:
              throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
@@@ -442,16 -452,6 +456,16 @@@
        }
      }
  
 +    private FileLink getFileLink(Path path, Configuration conf) throws IOException{
 +      String regionName = HFileLink.getReferencedRegionName(path.getName());
 +      TableName tableName = HFileLink.getReferencedTableName(path.getName());
 +      if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
-         return new HFileLink(MobUtils.getQualifiedMobRootDir(conf),
++        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
 +                HFileArchiveUtil.getArchivePath(conf), path);
 +      }
-       return new HFileLink(inputRoot, inputArchive, path);
++      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
 +    }
 +
      private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
        try {
          return fs.getFileChecksum(path);

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
index 6f7d847,330ead4..9d3407a
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
@@@ -34,17 -34,12 +34,18 @@@ import org.apache.hadoop.hbase.classifi
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FSDataInputStream;
  import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
  import org.apache.hadoop.hbase.HRegionInfo;
  import org.apache.hadoop.hbase.HTableDescriptor;
+ import org.apache.hadoop.hbase.TableDescriptor;
  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotDataManifest;
  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
index 2a599d3,d1f787a..50b5c9a
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
@@@ -39,14 -39,11 +39,14 @@@ import org.apache.hadoop.fs.Path
  import org.apache.hadoop.hbase.HRegionInfo;
  import org.apache.hadoop.hbase.TableName;
  import org.apache.hadoop.hbase.io.HFileLink;
 +import org.apache.hadoop.hbase.mob.MobUtils;
  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
- import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+ import org.apache.hadoop.hbase.wal.DefaultWALProvider;
  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 +import org.apache.hadoop.hbase.util.FSUtils;
  import org.apache.hadoop.hbase.util.FSVisitor;
 +import org.apache.hadoop.hbase.util.HFileArchiveUtil;
  
  /**
   * Utility methods for interacting with the snapshot referenced files.
@@@ -299,15 -296,7 +299,15 @@@ public final class SnapshotReferenceUti
      }
  
      // check if the linked file exists (in the archive, or in the table dir)
 -    HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, linkPath);
 +    HFileLink link = null;
 +    if (MobUtils.isMobRegionInfo(regionInfo)) {
 +      // for mob region
-       link = new HFileLink(MobUtils.getQualifiedMobRootDir(conf),
++      link = HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
 +          HFileArchiveUtil.getArchivePath(conf), linkPath);
 +    } else {
 +      // not mob region
-       link = new HFileLink(conf, linkPath);
++      link = HFileLink.buildFromHFileLinkPattern(conf, linkPath);
 +    }
      try {
        FileStatus fstat = link.getFileStatus(fs);
        if (storeFile.hasFileSize() && storeFile.getFileSize() != fstat.getLen()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
index a939422,0000000..27d53ba
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
@@@ -1,250 -1,0 +1,251 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.client;
 +
 +import java.io.IOException;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.LargeTests;
 +import org.apache.hadoop.hbase.NamespaceDescriptor;
 +import org.apache.hadoop.hbase.NamespaceNotFoundException;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
 +import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
 +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
++import org.apache.hadoop.hbase.testclassification.ClientTests;
++import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +/**
 + * Test clone snapshots from the client
 + */
- @Category(LargeTests.class)
++@Category({LargeTests.class, ClientTests.class})
 +public class TestMobCloneSnapshotFromClient {
 +  final Log LOG = LogFactory.getLog(getClass());
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +
 +  private final byte[] FAMILY = Bytes.toBytes("cf");
 +
 +  private byte[] emptySnapshot;
 +  private byte[] snapshotName0;
 +  private byte[] snapshotName1;
 +  private byte[] snapshotName2;
 +  private int snapshot0Rows;
 +  private int snapshot1Rows;
 +  private TableName tableName;
 +  private Admin admin;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
 +    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
 +    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
 +    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
 +    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
 +    TEST_UTIL.getConfiguration().setBoolean(
 +        "hbase.master.enabletable.roundrobin", true);
 +    TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
 +    TEST_UTIL.startMiniCluster(3);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  /**
 +   * Initialize the tests with a table filled with some data
 +   * and two snapshots (snapshotName0, snapshotName1) of different states.
 +   * The tableName, snapshotNames and the number of rows in the snapshot are initialized.
 +   */
 +  @Before
 +  public void setup() throws Exception {
 +    this.admin = TEST_UTIL.getHBaseAdmin();
 +
 +    long tid = System.currentTimeMillis();
 +    tableName = TableName.valueOf("testtb-" + tid);
 +    emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid);
 +    snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
 +    snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
 +    snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
 +
 +    // create Table and disable it
 +    MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY);
 +    admin.disableTable(tableName);
 +
 +    // take an empty snapshot
 +    admin.snapshot(emptySnapshot, tableName);
 +
 +    HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
 +    try {
 +      // enable table and insert data
 +      admin.enableTable(tableName);
-       SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
++      SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
 +      snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table);
 +      admin.disableTable(tableName);
 +
 +      // take a snapshot
 +      admin.snapshot(snapshotName0, tableName);
 +
 +      // enable table and insert more data
 +      admin.enableTable(tableName);
-       SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
++      SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
 +      snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table);
 +      admin.disableTable(tableName);
 +
 +      // take a snapshot of the updated table
 +      admin.snapshot(snapshotName1, tableName);
 +
 +      // re-enable table
 +      admin.enableTable(tableName);
 +    } finally {
 +      table.close();
 +    }
 +  }
 +
 +  protected int getNumReplicas() {
 +    return 1;
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    if (admin.tableExists(tableName)) {
 +      TEST_UTIL.deleteTable(tableName);
 +    }
 +    SnapshotTestingUtils.deleteAllSnapshots(admin);
 +    SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
 +  }
 +
 +  @Test(expected=SnapshotDoesNotExistException.class)
 +  public void testCloneNonExistentSnapshot() throws IOException, InterruptedException {
 +    String snapshotName = "random-snapshot-" + System.currentTimeMillis();
 +    TableName tableName = TableName.valueOf("random-table-" + System.currentTimeMillis());
 +    admin.cloneSnapshot(snapshotName, tableName);
 +  }
 +
 +  @Test(expected = NamespaceNotFoundException.class)
 +  public void testCloneOnMissingNamespace() throws IOException, InterruptedException {
 +    TableName clonedTableName = TableName.valueOf("unknownNS:clonetb");
 +    admin.cloneSnapshot(snapshotName1, clonedTableName);
 +  }
 +
 +  @Test
 +  public void testCloneSnapshot() throws IOException, InterruptedException {
 +    TableName clonedTableName = TableName.valueOf("clonedtb-" + System.currentTimeMillis());
 +    testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
 +    testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
 +    testCloneSnapshot(clonedTableName, emptySnapshot, 0);
 +  }
 +
 +  private void testCloneSnapshot(final TableName tableName, final byte[] snapshotName,
 +      int snapshotRows) throws IOException, InterruptedException {
 +    // create a new table from snapshot
 +    admin.cloneSnapshot(snapshotName, tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshotRows);
 +
 +    verifyReplicasCameOnline(tableName);
 +    TEST_UTIL.deleteTable(tableName);
 +  }
 +
 +  protected void verifyReplicasCameOnline(TableName tableName) throws IOException {
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +  }
 +
 +  @Test
 +  public void testCloneSnapshotCrossNamespace() throws IOException, InterruptedException {
 +    String nsName = "testCloneSnapshotCrossNamespace";
 +    admin.createNamespace(NamespaceDescriptor.create(nsName).build());
 +    TableName clonedTableName =
 +        TableName.valueOf(nsName, "clonedtb-" + System.currentTimeMillis());
 +    testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
 +    testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
 +    testCloneSnapshot(clonedTableName, emptySnapshot, 0);
 +  }
 +
 +  /**
 +   * Verify that tables created from the snapshot are still alive after source table deletion.
 +   */
 +  @Test
 +  public void testCloneLinksAfterDelete() throws IOException, InterruptedException {
 +    // Clone a table from the first snapshot
 +    TableName clonedTableName = TableName.valueOf("clonedtb1-" + System.currentTimeMillis());
 +    admin.cloneSnapshot(snapshotName0, clonedTableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
 +
 +    // Take a snapshot of this cloned table.
 +    admin.disableTable(clonedTableName);
 +    admin.snapshot(snapshotName2, clonedTableName);
 +
 +    // Clone the snapshot of the cloned table
 +    TableName clonedTableName2 = TableName.valueOf("clonedtb2-" + System.currentTimeMillis());
 +    admin.cloneSnapshot(snapshotName2, clonedTableName2);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
 +    admin.disableTable(clonedTableName2);
 +
 +    // Remove the original table
 +    TEST_UTIL.deleteTable(tableName);
 +    waitCleanerRun();
 +
 +    // Verify the first cloned table
 +    admin.enableTable(clonedTableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
 +
 +    // Verify the second cloned table
 +    admin.enableTable(clonedTableName2);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
 +    admin.disableTable(clonedTableName2);
 +
 +    // Delete the first cloned table
 +    TEST_UTIL.deleteTable(clonedTableName);
 +    waitCleanerRun();
 +
 +    // Verify the second cloned table
 +    admin.enableTable(clonedTableName2);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
 +
 +    // Clone a new table from cloned
 +    TableName clonedTableName3 = TableName.valueOf("clonedtb3-" + System.currentTimeMillis());
 +    admin.cloneSnapshot(snapshotName2, clonedTableName3);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName3, snapshot0Rows);
 +
 +    // Delete the cloned tables
 +    TEST_UTIL.deleteTable(clonedTableName2);
 +    TEST_UTIL.deleteTable(clonedTableName3);
 +    admin.deleteSnapshot(snapshotName2);
 +  }
 +
 +  // ==========================================================================
 +  //  Helpers
 +  // ==========================================================================
 +
 +  private void waitCleanerRun() throws InterruptedException {
 +    TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
index c75e006,0000000..0bb498d
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
@@@ -1,303 -1,0 +1,304 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.client;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.fail;
 +
 +import java.io.IOException;
 +import java.util.HashSet;
 +import java.util.Set;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.LargeTests;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.master.MasterFileSystem;
 +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
 +import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
 +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
 +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
++import org.apache.hadoop.hbase.testclassification.ClientTests;
++import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +/**
 + * Test restore snapshots from the client
 + */
- @Category(LargeTests.class)
++@Category({ClientTests.class, LargeTests.class})
 +public class TestMobRestoreSnapshotFromClient {
 +  final Log LOG = LogFactory.getLog(getClass());
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +
 +  private final byte[] FAMILY = Bytes.toBytes("cf");
 +
 +  private byte[] emptySnapshot;
 +  private byte[] snapshotName0;
 +  private byte[] snapshotName1;
 +  private byte[] snapshotName2;
 +  private int snapshot0Rows;
 +  private int snapshot1Rows;
 +  private TableName tableName;
 +  private Admin admin;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
 +    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
 +    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
 +    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
 +    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
 +    TEST_UTIL.getConfiguration().setBoolean(
 +        "hbase.master.enabletable.roundrobin", true);
 +    TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
 +    TEST_UTIL.startMiniCluster(3);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  /**
 +   * Initialize the tests with a table filled with some data
 +   * and two snapshots (snapshotName0, snapshotName1) of different states.
 +   * The tableName, snapshotNames and the number of rows in the snapshot are initialized.
 +   */
 +  @Before
 +  public void setup() throws Exception {
 +    this.admin = TEST_UTIL.getHBaseAdmin();
 +
 +    long tid = System.currentTimeMillis();
 +    tableName =
 +        TableName.valueOf("testtb-" + tid);
 +    emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid);
 +    snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
 +    snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
 +    snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
 +
 +    // create Table and disable it
 +    MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY);
 +
 +    admin.disableTable(tableName);
 +
 +    // take an empty snapshot
 +    admin.snapshot(emptySnapshot, tableName);
 +
 +    HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
 +    // enable table and insert data
 +    admin.enableTable(tableName);
-     SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
++    SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
 +    snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table);
 +    admin.disableTable(tableName);
 +
 +    // take a snapshot
 +    admin.snapshot(snapshotName0, tableName);
 +
 +    // enable table and insert more data
 +    admin.enableTable(tableName);
-     SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
++    SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
 +    snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table);
 +    table.close();
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    TEST_UTIL.deleteTable(tableName);
 +    SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin());
 +    SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
 +  }
 +
 +  @Test
 +  public void testRestoreSnapshot() throws IOException {
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
 +    admin.disableTable(tableName);
 +    admin.snapshot(snapshotName1, tableName);
 +    // Restore from snapshot-0
 +    admin.restoreSnapshot(snapshotName0);
 +    admin.enableTable(tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +
 +    // Restore from emptySnapshot
 +    admin.disableTable(tableName);
 +    admin.restoreSnapshot(emptySnapshot);
 +    admin.enableTable(tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, 0);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +
 +    // Restore from snapshot-1
 +    admin.disableTable(tableName);
 +    admin.restoreSnapshot(snapshotName1);
 +    admin.enableTable(tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +
 +    // Restore from snapshot-1
 +    TEST_UTIL.deleteTable(tableName);
 +    admin.restoreSnapshot(snapshotName1);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +  }
 +
 +  protected int getNumReplicas() {
 +    return 1;
 +  }
 +
 +  @Test
 +  public void testRestoreSchemaChange() throws Exception {
 +    byte[] TEST_FAMILY2 = Bytes.toBytes("cf2");
 +
 +    HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
 +
 +    // Add one column family and put some data in it
 +    admin.disableTable(tableName);
 +    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY2);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    admin.addColumn(tableName, hcd);
 +    admin.enableTable(tableName);
 +    assertEquals(2, table.getTableDescriptor().getFamilies().size());
 +    HTableDescriptor htd = admin.getTableDescriptor(tableName);
 +    assertEquals(2, htd.getFamilies().size());
-     SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, TEST_FAMILY2);
++    SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2);
 +    long snapshot2Rows = snapshot1Rows + 500;
 +    assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table));
 +    assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2));
 +    Set<String> fsFamilies = getFamiliesFromFS(tableName);
 +    assertEquals(2, fsFamilies.size());
 +
 +    // Take a snapshot
 +    admin.disableTable(tableName);
 +    admin.snapshot(snapshotName2, tableName);
 +
 +    // Restore the snapshot (without the cf)
 +    admin.restoreSnapshot(snapshotName0);
 +    admin.enableTable(tableName);
 +    assertEquals(1, table.getTableDescriptor().getFamilies().size());
 +    try {
 +      MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2);
 +      fail("family '" + Bytes.toString(TEST_FAMILY2) + "' should not exists");
 +    } catch (NoSuchColumnFamilyException e) {
 +      // expected
 +    }
 +    assertEquals(snapshot0Rows, MobSnapshotTestingUtils.countMobRows(table));
 +    htd = admin.getTableDescriptor(tableName);
 +    assertEquals(1, htd.getFamilies().size());
 +    fsFamilies = getFamiliesFromFS(tableName);
 +    assertEquals(1, fsFamilies.size());
 +
 +    // Restore back the snapshot (with the cf)
 +    admin.disableTable(tableName);
 +    admin.restoreSnapshot(snapshotName2);
 +    admin.enableTable(tableName);
 +    htd = admin.getTableDescriptor(tableName);
 +    assertEquals(2, htd.getFamilies().size());
 +    assertEquals(2, table.getTableDescriptor().getFamilies().size());
 +    assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2));
 +    assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table));
 +    fsFamilies = getFamiliesFromFS(tableName);
 +    assertEquals(2, fsFamilies.size());
 +    table.close();
 +  }
 +
 +  @Test
 +  public void testCloneSnapshotOfCloned() throws IOException, InterruptedException {
 +    TableName clonedTableName =
 +        TableName.valueOf("clonedtb-" + System.currentTimeMillis());
 +    admin.cloneSnapshot(snapshotName0, clonedTableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas());
 +    admin.disableTable(clonedTableName);
 +    admin.snapshot(snapshotName2, clonedTableName);
 +    TEST_UTIL.deleteTable(clonedTableName);
 +    waitCleanerRun();
 +
 +    admin.cloneSnapshot(snapshotName2, clonedTableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas());
 +    TEST_UTIL.deleteTable(clonedTableName);
 +  }
 +
 +  @Test
 +  public void testCloneAndRestoreSnapshot() throws IOException, InterruptedException {
 +    TEST_UTIL.deleteTable(tableName);
 +    waitCleanerRun();
 +
 +    admin.cloneSnapshot(snapshotName0, tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +    waitCleanerRun();
 +
 +    admin.disableTable(tableName);
 +    admin.restoreSnapshot(snapshotName0);
 +    admin.enableTable(tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +  }
 +
 +  @Test
 +  public void testCorruptedSnapshot() throws IOException, InterruptedException {
 +    SnapshotTestingUtils.corruptSnapshot(TEST_UTIL, Bytes.toString(snapshotName0));
 +    TableName cloneName = TableName.valueOf("corruptedClone-" + System.currentTimeMillis());
 +    try {
 +      admin.cloneSnapshot(snapshotName0, cloneName);
 +      fail("Expected CorruptedSnapshotException, got succeeded cloneSnapshot()");
 +    } catch (CorruptedSnapshotException e) {
 +      // Got the expected corruption exception.
 +      // check for no references of the cloned table.
 +      assertFalse(admin.tableExists(cloneName));
 +    } catch (Exception e) {
 +      fail("Expected CorruptedSnapshotException got: " + e);
 +    }
 +  }
 +
 +  // ==========================================================================
 +  //  Helpers
 +  // ==========================================================================
 +  private void waitCleanerRun() throws InterruptedException {
 +    TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
 +  }
 +
 +  private Set<String> getFamiliesFromFS(final TableName tableName) throws IOException {
 +    MasterFileSystem mfs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
 +    Set<String> families = new HashSet<String>();
 +    Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName);
 +    for (Path regionDir: FSUtils.getRegionDirs(mfs.getFileSystem(), tableDir)) {
 +      for (Path familyDir: FSUtils.getFamilyDirs(mfs.getFileSystem(), regionDir)) {
 +        families.add(familyDir.getName());
 +      }
 +    }
 +    return families;
 +  }
 +}