You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/05/01 17:28:34 UTC

[48/50] [abbrv] hbase git commit: Merge branch 'apache/master' (4/16/15) into hbase-11339

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
index d286b72,0000000..37d4461
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
@@@ -1,180 -1,0 +1,182 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.mapreduce;
 +
 +import java.io.IOException;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.Tag;
 +import org.apache.hadoop.hbase.TagType;
++import org.apache.hadoop.hbase.client.BufferedMutator;
 +import org.apache.hadoop.hbase.client.HTable;
 +import org.apache.hadoop.hbase.client.Put;
++import org.apache.hadoop.hbase.client.Table;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
 +import org.apache.hadoop.hbase.mob.mapreduce.SweepReducer.SweepPartitionId;
 +import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 +import org.apache.hadoop.hbase.regionserver.MemStore;
 +import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
 +import org.apache.hadoop.hbase.regionserver.StoreFile;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.mapreduce.Reducer.Context;
 +
 +/**
 + * The wrapper of a DefaultMemStore.
 + * This wrapper is used in the sweep reducer to buffer and sort the cells written from
 + * the invalid and small mob files.
 + * It's flushed when it's full, the mob data are written to the mob files, and their file names
 + * are written back to store files of HBase.
 + * This memStore is used to sort the cells in mob files.
 + * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date),
 + * in each group, the reducer iterates the files and read the cells to a new and bigger mob file.
 + * The cells in the same mob file are ordered, but cells across mob files are not.
 + * So we need this MemStoreWrapper to sort those cells come from different mob files before
 + * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file.
 + */
 +@InterfaceAudience.Private
 +public class MemStoreWrapper {
 +
 +  private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
 +
 +  private MemStore memstore;
 +  private long flushSize;
 +  private SweepPartitionId partitionId;
 +  private Context context;
 +  private Configuration conf;
-   private HTable table;
++  private BufferedMutator table;
 +  private HColumnDescriptor hcd;
 +  private Path mobFamilyDir;
 +  private FileSystem fs;
 +  private CacheConfig cacheConfig;
 +
-   public MemStoreWrapper(Context context, FileSystem fs, HTable table, HColumnDescriptor hcd,
++  public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table, HColumnDescriptor hcd,
 +      MemStore memstore, CacheConfig cacheConfig) throws IOException {
 +    this.memstore = memstore;
 +    this.context = context;
 +    this.fs = fs;
 +    this.table = table;
 +    this.hcd = hcd;
 +    this.conf = context.getConfiguration();
 +    this.cacheConfig = cacheConfig;
 +    flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
 +        MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
 +    mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
 +  }
 +
 +  public void setPartitionId(SweepPartitionId partitionId) {
 +    this.partitionId = partitionId;
 +  }
 +
 +  /**
 +   * Flushes the memstore if the size is large enough.
 +   * @throws IOException
 +   */
 +  private void flushMemStoreIfNecessary() throws IOException {
 +    if (memstore.heapSize() >= flushSize) {
 +      flushMemStore();
 +    }
 +  }
 +
 +  /**
 +   * Flushes the memstore anyway.
 +   * @throws IOException
 +   */
 +  public void flushMemStore() throws IOException {
 +    MemStoreSnapshot snapshot = memstore.snapshot();
 +    internalFlushCache(snapshot);
 +    memstore.clearSnapshot(snapshot.getId());
 +  }
 +
 +  /**
 +   * Flushes the snapshot of the memstore.
 +   * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase.
 +   * @param snapshot The snapshot of the memstore.
 +   * @throws IOException
 +   */
 +  private void internalFlushCache(final MemStoreSnapshot snapshot)
 +      throws IOException {
 +    if (snapshot.getCellsCount() == 0) {
 +      return;
 +    }
 +    // generate the files into a temp directory.
 +    String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
 +    StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd,
 +        partitionId.getDate(), new Path(tempPathString), snapshot.getCellsCount(),
 +        hcd.getCompactionCompression(), partitionId.getStartKey(), cacheConfig);
 +
 +    String relativePath = mobFileWriter.getPath().getName();
 +    LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
 +
 +    byte[] referenceValue = Bytes.toBytes(relativePath);
 +    KeyValueScanner scanner = snapshot.getScanner();
 +    Cell cell = null;
 +    while (null != (cell = scanner.next())) {
 +      KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
 +      mobFileWriter.append(kv);
 +    }
 +    scanner.close();
 +    // Write out the log sequence number that corresponds to this output
 +    // hfile. The hfile is current up to and including logCacheFlushId.
 +    mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount());
 +    mobFileWriter.close();
 +
 +    MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
 +    context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
 +    // write reference/fileName back to the store files of HBase.
 +    scanner = snapshot.getScanner();
 +    scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
 +    cell = null;
-     Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, this.table.getTableName());
++    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName().toString()));
 +    while (null != (cell = scanner.next())) {
 +      KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag);
 +      Put put =
 +          new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
 +      put.add(reference);
-       table.put(put);
++      table.mutate(put);
 +      context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1);
 +    }
-     table.flushCommits();
++    table.flush();
 +    scanner.close();
 +  }
 +
 +  /**
 +   * Adds a KeyValue into the memstore.
 +   * @param kv The KeyValue to be added.
 +   * @throws IOException
 +   */
 +  public void addToMemstore(KeyValue kv) throws IOException {
 +    memstore.add(kv);
 +    // flush the memstore if it's full.
 +    flushMemStoreIfNecessary();
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
index 73ca1a2,0000000..cbefd8a
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
@@@ -1,512 -1,0 +1,509 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.mapreduce;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.PathFilter;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTable;
++import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.io.HFileLink;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobFile;
 +import org.apache.hadoop.hbase.mob.MobFileName;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
 +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
 +import org.apache.hadoop.hbase.regionserver.BloomType;
 +import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
 +import org.apache.hadoop.hbase.regionserver.StoreFile;
 +import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 +import org.apache.hadoop.io.IOUtils;
 +import org.apache.hadoop.io.SequenceFile;
 +import org.apache.hadoop.io.SequenceFile.CompressionType;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.mapreduce.Reducer;
 +import org.apache.zookeeper.KeeperException;
 +
 +/**
 + * The reducer of a sweep job.
 + * This reducer merges the small mob files into bigger ones, and write visited
 + * names of mob files to a sequence file which is used by the sweep job to delete
 + * the unused mob files.
 + * The key of the input is a file name, the value is a collection of KeyValue where
 + * the KeyValue is the actual cell (its format is valueLength + fileName) in HBase.
 + * In this reducer, we could know how many cells exist in HBase for a mob file.
 + * If the existCellSize/mobFileSize < compactionRatio, this mob
 + * file needs to be merged.
 + */
 +@InterfaceAudience.Private
 +public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
 +
 +  private static final Log LOG = LogFactory.getLog(SweepReducer.class);
 +
 +  private SequenceFile.Writer writer = null;
 +  private MemStoreWrapper memstore;
 +  private Configuration conf;
 +  private FileSystem fs;
 +
 +  private Path familyDir;
 +  private CacheConfig cacheConfig;
 +  private long compactionBegin;
-   private HTable table;
++  private BufferedMutator table;
 +  private HColumnDescriptor family;
 +  private long mobCompactionDelay;
 +  private Path mobTableDir;
 +
 +  @Override
 +  protected void setup(Context context) throws IOException, InterruptedException {
 +    this.conf = context.getConfiguration();
++    Connection c = ConnectionFactory.createConnection(this.conf);
 +    this.fs = FileSystem.get(conf);
 +    // the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing.
 +    mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY);
 +    String tableName = conf.get(TableInputFormat.INPUT_TABLE);
 +    String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
 +    TableName tn = TableName.valueOf(tableName);
 +    this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName);
-     HBaseAdmin admin = new HBaseAdmin(this.conf);
++    Admin admin = c.getAdmin();
 +    try {
 +      family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName));
 +      if (family == null) {
 +        // this column family might be removed, directly return.
 +        throw new InvalidFamilyOperationException("Column family '" + familyName
 +            + "' does not exist. It might be removed.");
 +      }
 +    } finally {
 +      try {
 +        admin.close();
 +      } catch (IOException e) {
 +        LOG.warn("Fail to close the HBaseAdmin", e);
 +      }
 +    }
 +    // disable the block cache.
 +    Configuration copyOfConf = new Configuration(conf);
 +    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
 +    this.cacheConfig = new CacheConfig(copyOfConf);
 +
-     table = new HTable(this.conf, Bytes.toBytes(tableName));
-     table.setAutoFlush(false, false);
- 
-     table.setWriteBufferSize(1 * 1024 * 1024); // 1MB
++    table = c.getBufferedMutator(new BufferedMutatorParams(tn).writeBufferSize(1*1024*1024));
 +    memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig);
 +
 +    // The start time of the sweep tool.
 +    // Only the mob files whose creation time is older than startTime-oneDay will be handled by the
 +    // reducer since it brings inconsistency to handle the latest mob files.
 +    this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0);
 +    mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
 +  }
 +
 +  private SweepPartition createPartition(SweepPartitionId id, Context context) throws IOException {
 +    return new SweepPartition(id, context);
 +  }
 +
 +  @Override
 +  public void run(Context context) throws IOException, InterruptedException {
 +    String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
 +    String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
 +    String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
 +    ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
 +        new DummyMobAbortable());
 +    FSDataOutputStream fout = null;
 +    try {
 +      SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
 +      tracker.start();
 +      setup(context);
 +      // create a sequence contains all the visited file names in this reducer.
 +      String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
 +      Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
 +          .replace("-", MobConstants.EMPTY_STRING));
 +      fout = fs.create(nameFilePath, true);
 +      writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class,
 +          String.class, CompressionType.NONE, null);
 +      SweepPartitionId id;
 +      SweepPartition partition = null;
 +      // the mob files which have the same start key and date are in the same partition.
 +      while (context.nextKey()) {
 +        Text key = context.getCurrentKey();
 +        String keyString = key.toString();
 +        id = SweepPartitionId.create(keyString);
 +        if (null == partition || !id.equals(partition.getId())) {
 +          // It's the first mob file in the current partition.
 +          if (null != partition) {
 +            // this mob file is in different partitions with the previous mob file.
 +            // directly close.
 +            partition.close();
 +          }
 +          // create a new one
 +          partition = createPartition(id, context);
 +        }
 +        if (partition != null) {
 +          // run the partition
 +          partition.execute(key, context.getValues());
 +        }
 +      }
 +      if (null != partition) {
 +        partition.close();
 +      }
 +      writer.hflush();
 +    } catch (KeeperException e) {
 +      throw new IOException(e);
 +    } finally {
 +      cleanup(context);
 +      zkw.close();
 +      if (writer != null) {
 +        IOUtils.closeStream(writer);
 +      }
 +      if (fout != null) {
 +        IOUtils.closeStream(fout);
 +      }
 +      if (table != null) {
 +        try {
 +          table.close();
 +        } catch (IOException e) {
 +          LOG.warn(e);
 +        }
 +      }
 +    }
 +
 +  }
 +
 +  /**
 +   * The mob files which have the same start key and date are in the same partition.
 +   * The files in the same partition are merged together into bigger ones.
 +   */
 +  public class SweepPartition {
 +
 +    private final SweepPartitionId id;
 +    private final Context context;
 +    private boolean memstoreUpdated = false;
 +    private boolean mergeSmall = false;
 +    private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>();
 +    private final List<Path> toBeDeleted = new ArrayList<Path>();
 +
 +    public SweepPartition(SweepPartitionId id, Context context) throws IOException {
 +      this.id = id;
 +      this.context = context;
 +      memstore.setPartitionId(id);
 +      init();
 +    }
 +
 +    public SweepPartitionId getId() {
 +      return this.id;
 +    }
 +
 +    /**
 +     * Prepares the map of files.
 +     *
 +     * @throws IOException
 +     */
 +    private void init() throws IOException {
 +      FileStatus[] fileStats = listStatus(familyDir, id.getStartKey());
 +      if (null == fileStats) {
 +        return;
 +      }
 +
 +      int smallFileCount = 0;
 +      float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO,
 +          MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO);
 +      long compactionMergeableSize = conf.getLong(
 +          MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE,
 +          MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE);
 +      // list the files. Just merge the hfiles, don't merge the hfile links.
 +      // prepare the map of mob files. The key is the file name, the value is the file status.
 +      for (FileStatus fileStat : fileStats) {
 +        MobFileStatus mobFileStatus = null;
 +        if (!HFileLink.isHFileLink(fileStat.getPath())) {
 +          mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize);
 +          if (mobFileStatus.needMerge()) {
 +            smallFileCount++;
 +          }
 +          // key is file name (not hfile name), value is hfile status.
 +          fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus);
 +        }
 +      }
 +      if (smallFileCount >= 2) {
 +        // merge the files only when there're more than 1 files in the same partition.
 +        this.mergeSmall = true;
 +      }
 +    }
 +
 +    /**
 +     * Flushes the data into mob files and store files, and archives the small
 +     * files after they're merged.
 +     * @throws IOException
 +     */
 +    public void close() throws IOException {
 +      if (null == id) {
 +        return;
 +      }
 +      // flush remain key values into mob files
 +      if (memstoreUpdated) {
 +        memstore.flushMemStore();
 +      }
 +      List<StoreFile> storeFiles = new ArrayList<StoreFile>(toBeDeleted.size());
 +      // delete samll files after compaction
 +      for (Path path : toBeDeleted) {
 +        LOG.info("[In Partition close] Delete the file " + path + " in partition close");
 +        storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE));
 +      }
 +      if (!storeFiles.isEmpty()) {
 +        try {
 +          MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(),
 +              storeFiles);
 +          context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
 +        } catch (IOException e) {
 +          LOG.error("Fail to archive the store files " + storeFiles, e);
 +        }
 +        storeFiles.clear();
 +      }
 +      fileStatusMap.clear();
 +    }
 +
 +    /**
 +     * Merges the small mob files into bigger ones.
 +     * @param fileName The current mob file name.
 +     * @param values The collection of KeyValues in this mob file.
 +     * @throws IOException
 +     */
 +    public void execute(Text fileName, Iterable<KeyValue> values) throws IOException {
 +      if (null == values) {
 +        return;
 +      }
 +      MobFileName mobFileName = MobFileName.create(fileName.toString());
 +      LOG.info("[In reducer] The file name: " + fileName.toString());
 +      MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName());
 +      if (null == mobFileStat) {
 +        LOG.info("[In reducer] Cannot find the file, probably this record is obsolete");
 +        return;
 +      }
 +      // only handle the files that are older then one day.
 +      if (compactionBegin - mobFileStat.getFileStatus().getModificationTime()
 +          <= mobCompactionDelay) {
 +        return;
 +      }
 +      // write the hfile name
 +      writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING);
 +      Set<KeyValue> kvs = new HashSet<KeyValue>();
 +      for (KeyValue kv : values) {
 +        if (kv.getValueLength() > Bytes.SIZEOF_INT) {
 +          mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(),
 +              Bytes.SIZEOF_INT));
 +        }
 +        kvs.add(kv.createKeyOnly(false));
 +      }
 +      // If the mob file is a invalid one or a small one, merge it into new/bigger ones.
 +      if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) {
 +        context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1);
 +        MobFile file = MobFile.create(fs,
 +            new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig);
 +        StoreFileScanner scanner = null;
 +        try {
 +          scanner = file.getScanner();
 +          scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
 +          Cell cell;
 +          while (null != (cell = scanner.next())) {
 +            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
 +            KeyValue keyOnly = kv.createKeyOnly(false);
 +            if (kvs.contains(keyOnly)) {
 +              // write the KeyValue existing in HBase to the memstore.
 +              memstore.addToMemstore(kv);
 +              memstoreUpdated = true;
 +            }
 +          }
 +        } finally {
 +          if (scanner != null) {
 +            scanner.close();
 +          }
 +        }
 +        toBeDeleted.add(mobFileStat.getFileStatus().getPath());
 +      }
 +    }
 +
 +    /**
 +     * Lists the files with the same prefix.
 +     * @param p The file path.
 +     * @param prefix The prefix.
 +     * @return The files with the same prefix.
 +     * @throws IOException
 +     */
 +    private FileStatus[] listStatus(Path p, String prefix) throws IOException {
 +      return fs.listStatus(p, new PathPrefixFilter(prefix));
 +    }
 +  }
 +
 +  static class PathPrefixFilter implements PathFilter {
 +
 +    private final String prefix;
 +
 +    public PathPrefixFilter(String prefix) {
 +      this.prefix = prefix;
 +    }
 +
 +    public boolean accept(Path path) {
 +      return path.getName().startsWith(prefix, 0);
 +    }
 +
 +  }
 +
 +  /**
 +   * The sweep partition id.
 +   * It consists of the start key and date.
 +   * The start key is a hex string of the checksum of a region start key.
 +   * The date is the latest timestamp of cells in a mob file.
 +   */
 +  public static class SweepPartitionId {
 +    private String date;
 +    private String startKey;
 +
 +    public SweepPartitionId(MobFileName fileName) {
 +      this.date = fileName.getDate();
 +      this.startKey = fileName.getStartKey();
 +    }
 +
 +    public SweepPartitionId(String date, String startKey) {
 +      this.date = date;
 +      this.startKey = startKey;
 +    }
 +
 +    public static SweepPartitionId create(String key) {
 +      return new SweepPartitionId(MobFileName.create(key));
 +    }
 +
 +    @Override
 +    public boolean equals(Object anObject) {
 +      if (this == anObject) {
 +        return true;
 +      }
 +      if (anObject instanceof SweepPartitionId) {
 +        SweepPartitionId another = (SweepPartitionId) anObject;
 +        if (this.date.equals(another.getDate()) && this.startKey.equals(another.getStartKey())) {
 +          return true;
 +        }
 +      }
 +      return false;
 +    }
 +
 +    public String getDate() {
 +      return this.date;
 +    }
 +
 +    public String getStartKey() {
 +      return this.startKey;
 +    }
 +
 +    public void setDate(String date) {
 +      this.date = date;
 +    }
 +
 +    public void setStartKey(String startKey) {
 +      this.startKey = startKey;
 +    }
 +  }
 +
 +  /**
 +   * The mob file status used in the sweep reduecer.
 +   */
 +  private static class MobFileStatus {
 +    private FileStatus fileStatus;
 +    private int validSize;
 +    private long size;
 +
 +    private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO;
 +    private long compactionMergeableSize =
 +        MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE;
 +
 +    /**
 +     * @param fileStatus The current FileStatus.
 +     * @param compactionRatio compactionRatio the invalid ratio.
 +     * If there're too many cells deleted in a mob file, it's regarded as invalid,
 +     * and needs to be written to a new one.
 +     * If existingCellSize/fileSize < compactionRatio, it's regarded as a invalid one.
 +     * @param compactionMergeableSize compactionMergeableSize If the size of a mob file is less
 +     * than this value, it's regarded as a small file and needs to be merged
 +     */
 +    public MobFileStatus(FileStatus fileStatus, float compactionRatio,
 +        long compactionMergeableSize) {
 +      this.fileStatus = fileStatus;
 +      this.size = fileStatus.getLen();
 +      validSize = 0;
 +      this.compactionRatio = compactionRatio;
 +      this.compactionMergeableSize = compactionMergeableSize;
 +    }
 +
 +    /**
 +     * Add size to this file.
 +     * @param size The size to be added.
 +     */
 +    public void addValidSize(int size) {
 +      this.validSize += size;
 +    }
 +
 +    /**
 +     * Whether the mob files need to be cleaned.
 +     * If there're too many cells deleted in this mob file, it needs to be cleaned.
 +     * @return True if it needs to be cleaned.
 +     */
 +    public boolean needClean() {
 +      return validSize < compactionRatio * size;
 +    }
 +
 +    /**
 +     * Whether the mob files need to be merged.
 +     * If this mob file is too small, it needs to be merged.
 +     * @return True if it needs to be merged.
 +     */
 +    public boolean needMerge() {
 +      return this.size < compactionMergeableSize;
 +    }
 +
 +    /**
 +     * Gets the file status.
 +     * @return The file status.
 +     */
 +    public FileStatus getFileStatus() {
 +      return fileStatus;
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 73b8cb9,73b8cb9..8ff4840
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@@ -85,7 -85,7 +85,7 @@@ public class DefaultStoreFlusher extend
        scanner.close();
      }
      LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
--        + StringUtils.humanReadableInt(snapshot.getSize()) +
++        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
          ", hasBloomFilter=" + writer.hasGeneralBloom() +
          ", into tmp file " + writer.getPath());
      result.add(writer.getPath());

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ab0165d,e082698..6684309
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@@ -3276,34 -3421,12 +3422,30 @@@ public class HRegion implements HeapSiz
      Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
  
      SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
 -                                                        snapshotDir, desc, exnSnare);
 +            snapshotDir, desc, exnSnare);
      manifest.addRegion(this);
 +
 +    // The regionserver holding the first region of the table is responsible for taking the
 +    // manifest of the mob dir.
-     if (!Bytes.equals(getStartKey(), HConstants.EMPTY_START_ROW))
++    if (!Bytes.equals(getRegionInfo().getStartKey(), HConstants.EMPTY_START_ROW))
 +      return;
 +
 +    // if any cf's have is mob enabled, add the "mob region" to the manifest.
-     Map<byte[], Store> stores = getStores();
-     for (Entry<byte[], Store> store : stores.entrySet()) {
-       boolean hasMobStore = store.getValue().getFamily().isMobEnabled();
++    List<Store> stores = getStores();
++    for (Store store : stores) {
++      boolean hasMobStore = store.getFamily().isMobEnabled();
 +      if (hasMobStore) {
 +        // use the .mob as the start key and 0 as the regionid
 +        HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(this.getTableDesc().getTableName());
 +        mobRegionInfo.setOffline(true);
 +        manifest.addMobRegion(mobRegionInfo, this.getTableDesc().getColumnFamilies());
 +        return;
 +      }
 +    }
    }
  
-   /**
-    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
-    * provided current timestamp.
-    * @throws IOException
-    */
-   void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
+   @Override
+   public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
        throws IOException {
      for (List<Cell> cells: cellItr) {
        if (cells == null) continue;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 159ec55,8f7dee4..ea9558f
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@@ -549,27 -461,19 +560,28 @@@ class MetricsRegionServerWrapperImp
        long tempFlushedCellsSize = 0;
        long tempCompactedCellsSize = 0;
        long tempMajorCompactedCellsSize = 0;
 +      long tempMobCompactedIntoMobCellsCount = 0;
 +      long tempMobCompactedFromMobCellsCount = 0;
 +      long tempMobCompactedIntoMobCellsSize = 0;
 +      long tempMobCompactedFromMobCellsSize = 0;
 +      long tempMobFlushCount = 0;
 +      long tempMobFlushedCellsCount = 0;
 +      long tempMobFlushedCellsSize = 0;
 +      long tempMobScanCellsCount = 0;
 +      long tempMobScanCellsSize = 0;
        long tempBlockedRequestsCount = 0L;
  
-       for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
-         tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get();
-         tempDataInMemoryWithoutWAL += r.dataInMemoryWithoutWAL.get();
-         tempReadRequestsCount += r.readRequestsCount.get();
-         tempWriteRequestsCount += r.writeRequestsCount.get();
-         tempCheckAndMutateChecksFailed += r.checkAndMutateChecksFailed.get();
-         tempCheckAndMutateChecksPassed += r.checkAndMutateChecksPassed.get();
+       for (Region r : regionServer.getOnlineRegionsLocalContext()) {
+         tempNumMutationsWithoutWAL += r.getNumMutationsWithoutWAL();
+         tempDataInMemoryWithoutWAL += r.getDataInMemoryWithoutWAL();
+         tempReadRequestsCount += r.getReadRequestsCount();
+         tempWriteRequestsCount += r.getWriteRequestsCount();
+         tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed();
+         tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed();
          tempBlockedRequestsCount += r.getBlockedRequestsCount();
-         tempNumStores += r.stores.size();
-         for (Store store : r.stores.values()) {
+         List<Store> storeList = r.getStores();
+         tempNumStores += storeList.size();
+         for (Store store : storeList) {
            tempNumStoreFiles += store.getStorefilesCount();
            tempMemstoreSize += store.getMemStoreSize();
            tempStoreFileSize += store.getStorefilesSize();
@@@ -582,21 -486,13 +594,25 @@@
            tempFlushedCellsSize += store.getFlushedCellsSize();
            tempCompactedCellsSize += store.getCompactedCellsSize();
            tempMajorCompactedCellsSize += store.getMajorCompactedCellsSize();
 +          if (store instanceof HMobStore) {
 +            HMobStore mobStore = (HMobStore) store;
 +            tempMobCompactedIntoMobCellsCount += mobStore.getMobCompactedIntoMobCellsCount();
 +            tempMobCompactedFromMobCellsCount += mobStore.getMobCompactedFromMobCellsCount();
 +            tempMobCompactedIntoMobCellsSize += mobStore.getMobCompactedIntoMobCellsSize();
 +            tempMobCompactedFromMobCellsSize += mobStore.getMobCompactedFromMobCellsSize();
 +            tempMobFlushCount += mobStore.getMobFlushCount();
 +            tempMobFlushedCellsCount += mobStore.getMobFlushedCellsCount();
 +            tempMobFlushedCellsSize += mobStore.getMobFlushedCellsSize();
 +            tempMobScanCellsCount += mobStore.getMobScanCellsCount();
 +            tempMobScanCellsSize += mobStore.getMobScanCellsSize();
 +          }
          }
  
-         hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
+         HDFSBlocksDistribution distro = r.getHDFSBlocksDistribution();
+         hdfsBlocksDistribution.add(distro);
+         if (r.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+           hdfsBlocksDistributionSecondaryRegions.add(distro);
+         }
        }
  
        float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
index f7f0acd,0000000..5739df1
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
@@@ -1,80 -1,0 +1,80 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import java.io.IOException;
 +import java.util.List;
 +import java.util.NavigableSet;
 +
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +
 +/**
 + * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into List<KeyValue>
 + * for a single row.
 + *
 + */
 +@InterfaceAudience.Private
 +public class MobStoreScanner extends StoreScanner {
 +
 +  private boolean cacheMobBlocks = false;
 +  private final HMobStore mobStore;
 +
 +  public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
 +      final NavigableSet<byte[]> columns, long readPt) throws IOException {
 +    super(store, scanInfo, scan, columns, readPt);
 +    cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
 +    if (!(store instanceof HMobStore)) {
 +      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
 +    }
 +    mobStore = (HMobStore) store;
 +  }
 +
 +  /**
 +   * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
 +   * reference tag), the scanner need seek this cell from the mob file, and use the cell found
 +   * from the mob file as the result.
 +   */
 +  @Override
-   public boolean next(List<Cell> outResult, int limit) throws IOException {
-     boolean result = super.next(outResult, limit);
++  public boolean next(List<Cell> outResult, ScannerContext ctx) throws IOException {
++    boolean result = super.next(outResult, ctx);
 +    if (!MobUtils.isRawMobScan(scan)) {
 +      // retrieve the mob data
 +      if (outResult.isEmpty()) {
 +        return result;
 +      }
 +      long mobKVCount = 0;
 +      long mobKVSize = 0;
 +      for (int i = 0; i < outResult.size(); i++) {
 +        Cell cell = outResult.get(i);
 +        if (MobUtils.isMobReferenceCell(cell)) {
 +          Cell mobCell = mobStore.resolve(cell, cacheMobBlocks);
 +          mobKVCount++;
 +          mobKVSize += mobCell.getValueLength();
 +          outResult.set(i, mobCell);
 +        }
 +      }
 +      mobStore.updateMobScanCellsCount(mobKVCount);
 +      mobStore.updateMobScanCellsSize(mobKVSize);
 +    }
 +    return result;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
index 4c46218,0000000..85be382
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
@@@ -1,80 -1,0 +1,80 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import java.io.IOException;
 +import java.util.List;
 +import java.util.NavigableSet;
 +
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +
 +/**
 + * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
 + * reversed scanning in both the memstore and the MOB store.
 + *
 + */
 +@InterfaceAudience.Private
 +public class ReversedMobStoreScanner extends ReversedStoreScanner {
 +
 +  private boolean cacheMobBlocks = false;
 +  protected final HMobStore mobStore;
 +
 +  ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
 +      long readPt) throws IOException {
 +    super(store, scanInfo, scan, columns, readPt);
 +    cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
 +    if (!(store instanceof HMobStore)) {
 +      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
 +    }
 +    mobStore = (HMobStore) store;
 +  }
 +
 +  /**
 +   * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
 +   * reference tag), the scanner need seek this cell from the mob file, and use the cell found
 +   * from the mob file as the result.
 +   */
 +  @Override
-   public boolean next(List<Cell> outResult, int limit) throws IOException {
-     boolean result = super.next(outResult, limit);
++  public boolean next(List<Cell> outResult, ScannerContext ctx) throws IOException {
++    boolean result = super.next(outResult, ctx);
 +    if (!MobUtils.isRawMobScan(scan)) {
 +      // retrieve the mob data
 +      if (outResult.isEmpty()) {
 +        return result;
 +      }
 +      long mobKVCount = 0;
 +      long mobKVSize = 0;
 +      for (int i = 0; i < outResult.size(); i++) {
 +        Cell cell = outResult.get(i);
 +        if (MobUtils.isMobReferenceCell(cell)) {
 +          Cell mobCell = mobStore.resolve(cell, cacheMobBlocks);
 +          mobKVCount++;
 +          mobKVSize += mobCell.getValueLength();
 +          outResult.set(i, mobCell);
 +        }
 +      }
 +      mobStore.updateMobScanCellsCount(mobKVCount);
 +      mobStore.updateMobScanCellsSize(mobKVSize);
 +    }
 +    return result;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
index 27d53ba,0000000..60fc0ff
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
@@@ -1,251 -1,0 +1,252 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.client;
 +
 +import java.io.IOException;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.NamespaceDescriptor;
 +import org.apache.hadoop.hbase.NamespaceNotFoundException;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
 +import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
 +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 +import org.apache.hadoop.hbase.testclassification.ClientTests;
 +import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +/**
 + * Test clone snapshots from the client
 + */
 +@Category({LargeTests.class, ClientTests.class})
 +public class TestMobCloneSnapshotFromClient {
 +  final Log LOG = LogFactory.getLog(getClass());
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +
 +  private final byte[] FAMILY = Bytes.toBytes("cf");
 +
 +  private byte[] emptySnapshot;
 +  private byte[] snapshotName0;
 +  private byte[] snapshotName1;
 +  private byte[] snapshotName2;
 +  private int snapshot0Rows;
 +  private int snapshot1Rows;
 +  private TableName tableName;
 +  private Admin admin;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
 +    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
 +    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
 +    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
 +    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
 +    TEST_UTIL.getConfiguration().setBoolean(
 +        "hbase.master.enabletable.roundrobin", true);
 +    TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
 +    TEST_UTIL.startMiniCluster(3);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  /**
 +   * Initialize the tests with a table filled with some data
 +   * and two snapshots (snapshotName0, snapshotName1) of different states.
 +   * The tableName, snapshotNames and the number of rows in the snapshot are initialized.
 +   */
 +  @Before
 +  public void setup() throws Exception {
 +    this.admin = TEST_UTIL.getHBaseAdmin();
 +
 +    long tid = System.currentTimeMillis();
 +    tableName = TableName.valueOf("testtb-" + tid);
 +    emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid);
 +    snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
 +    snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
 +    snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
 +
 +    // create Table and disable it
 +    MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY);
 +    admin.disableTable(tableName);
 +
 +    // take an empty snapshot
 +    admin.snapshot(emptySnapshot, tableName);
 +
-     HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
++    Connection c = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
++    Table table = c.getTable(tableName);
 +    try {
 +      // enable table and insert data
 +      admin.enableTable(tableName);
 +      SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
 +      snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table);
 +      admin.disableTable(tableName);
 +
 +      // take a snapshot
 +      admin.snapshot(snapshotName0, tableName);
 +
 +      // enable table and insert more data
 +      admin.enableTable(tableName);
 +      SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
 +      snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table);
 +      admin.disableTable(tableName);
 +
 +      // take a snapshot of the updated table
 +      admin.snapshot(snapshotName1, tableName);
 +
 +      // re-enable table
 +      admin.enableTable(tableName);
 +    } finally {
 +      table.close();
 +    }
 +  }
 +
 +  protected int getNumReplicas() {
 +    return 1;
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    if (admin.tableExists(tableName)) {
 +      TEST_UTIL.deleteTable(tableName);
 +    }
 +    SnapshotTestingUtils.deleteAllSnapshots(admin);
 +    SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
 +  }
 +
 +  @Test(expected=SnapshotDoesNotExistException.class)
 +  public void testCloneNonExistentSnapshot() throws IOException, InterruptedException {
 +    String snapshotName = "random-snapshot-" + System.currentTimeMillis();
 +    TableName tableName = TableName.valueOf("random-table-" + System.currentTimeMillis());
 +    admin.cloneSnapshot(snapshotName, tableName);
 +  }
 +
 +  @Test(expected = NamespaceNotFoundException.class)
 +  public void testCloneOnMissingNamespace() throws IOException, InterruptedException {
 +    TableName clonedTableName = TableName.valueOf("unknownNS:clonetb");
 +    admin.cloneSnapshot(snapshotName1, clonedTableName);
 +  }
 +
 +  @Test
 +  public void testCloneSnapshot() throws IOException, InterruptedException {
 +    TableName clonedTableName = TableName.valueOf("clonedtb-" + System.currentTimeMillis());
 +    testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
 +    testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
 +    testCloneSnapshot(clonedTableName, emptySnapshot, 0);
 +  }
 +
 +  private void testCloneSnapshot(final TableName tableName, final byte[] snapshotName,
 +      int snapshotRows) throws IOException, InterruptedException {
 +    // create a new table from snapshot
 +    admin.cloneSnapshot(snapshotName, tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshotRows);
 +
 +    verifyReplicasCameOnline(tableName);
 +    TEST_UTIL.deleteTable(tableName);
 +  }
 +
 +  protected void verifyReplicasCameOnline(TableName tableName) throws IOException {
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +  }
 +
 +  @Test
 +  public void testCloneSnapshotCrossNamespace() throws IOException, InterruptedException {
 +    String nsName = "testCloneSnapshotCrossNamespace";
 +    admin.createNamespace(NamespaceDescriptor.create(nsName).build());
 +    TableName clonedTableName =
 +        TableName.valueOf(nsName, "clonedtb-" + System.currentTimeMillis());
 +    testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
 +    testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
 +    testCloneSnapshot(clonedTableName, emptySnapshot, 0);
 +  }
 +
 +  /**
 +   * Verify that tables created from the snapshot are still alive after source table deletion.
 +   */
 +  @Test
 +  public void testCloneLinksAfterDelete() throws IOException, InterruptedException {
 +    // Clone a table from the first snapshot
 +    TableName clonedTableName = TableName.valueOf("clonedtb1-" + System.currentTimeMillis());
 +    admin.cloneSnapshot(snapshotName0, clonedTableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
 +
 +    // Take a snapshot of this cloned table.
 +    admin.disableTable(clonedTableName);
 +    admin.snapshot(snapshotName2, clonedTableName);
 +
 +    // Clone the snapshot of the cloned table
 +    TableName clonedTableName2 = TableName.valueOf("clonedtb2-" + System.currentTimeMillis());
 +    admin.cloneSnapshot(snapshotName2, clonedTableName2);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
 +    admin.disableTable(clonedTableName2);
 +
 +    // Remove the original table
 +    TEST_UTIL.deleteTable(tableName);
 +    waitCleanerRun();
 +
 +    // Verify the first cloned table
 +    admin.enableTable(clonedTableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
 +
 +    // Verify the second cloned table
 +    admin.enableTable(clonedTableName2);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
 +    admin.disableTable(clonedTableName2);
 +
 +    // Delete the first cloned table
 +    TEST_UTIL.deleteTable(clonedTableName);
 +    waitCleanerRun();
 +
 +    // Verify the second cloned table
 +    admin.enableTable(clonedTableName2);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
 +
 +    // Clone a new table from cloned
 +    TableName clonedTableName3 = TableName.valueOf("clonedtb3-" + System.currentTimeMillis());
 +    admin.cloneSnapshot(snapshotName2, clonedTableName3);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName3, snapshot0Rows);
 +
 +    // Delete the cloned tables
 +    TEST_UTIL.deleteTable(clonedTableName2);
 +    TEST_UTIL.deleteTable(clonedTableName3);
 +    admin.deleteSnapshot(snapshotName2);
 +  }
 +
 +  // ==========================================================================
 +  //  Helpers
 +  // ==========================================================================
 +
 +  private void waitCleanerRun() throws InterruptedException {
 +    TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
index 0bb498d,0000000..6fc2d28
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
@@@ -1,304 -1,0 +1,306 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.client;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.fail;
 +
 +import java.io.IOException;
 +import java.util.HashSet;
 +import java.util.Set;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.master.MasterFileSystem;
 +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
 +import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
 +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
 +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 +import org.apache.hadoop.hbase.testclassification.ClientTests;
 +import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +/**
 + * Test restore snapshots from the client
 + */
 +@Category({ClientTests.class, LargeTests.class})
 +public class TestMobRestoreSnapshotFromClient {
 +  final Log LOG = LogFactory.getLog(getClass());
 +
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +
 +  private final byte[] FAMILY = Bytes.toBytes("cf");
 +
 +  private byte[] emptySnapshot;
 +  private byte[] snapshotName0;
 +  private byte[] snapshotName1;
 +  private byte[] snapshotName2;
 +  private int snapshot0Rows;
 +  private int snapshot1Rows;
 +  private TableName tableName;
 +  private Admin admin;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
 +    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
 +    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
 +    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
 +    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
 +    TEST_UTIL.getConfiguration().setBoolean(
 +        "hbase.master.enabletable.roundrobin", true);
 +    TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
 +    TEST_UTIL.startMiniCluster(3);
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  /**
 +   * Initialize the tests with a table filled with some data
 +   * and two snapshots (snapshotName0, snapshotName1) of different states.
 +   * The tableName, snapshotNames and the number of rows in the snapshot are initialized.
 +   */
 +  @Before
 +  public void setup() throws Exception {
 +    this.admin = TEST_UTIL.getHBaseAdmin();
 +
 +    long tid = System.currentTimeMillis();
 +    tableName =
 +        TableName.valueOf("testtb-" + tid);
 +    emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid);
 +    snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
 +    snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
 +    snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
 +
 +    // create Table and disable it
 +    MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY);
 +
 +    admin.disableTable(tableName);
 +
 +    // take an empty snapshot
 +    admin.snapshot(emptySnapshot, tableName);
 +
-     HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
++    Table table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
++            .getTable(tableName);
 +    // enable table and insert data
 +    admin.enableTable(tableName);
 +    SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
 +    snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table);
 +    admin.disableTable(tableName);
 +
 +    // take a snapshot
 +    admin.snapshot(snapshotName0, tableName);
 +
 +    // enable table and insert more data
 +    admin.enableTable(tableName);
 +    SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
 +    snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table);
 +    table.close();
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    TEST_UTIL.deleteTable(tableName);
 +    SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin());
 +    SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
 +  }
 +
 +  @Test
 +  public void testRestoreSnapshot() throws IOException {
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
 +    admin.disableTable(tableName);
 +    admin.snapshot(snapshotName1, tableName);
 +    // Restore from snapshot-0
 +    admin.restoreSnapshot(snapshotName0);
 +    admin.enableTable(tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +
 +    // Restore from emptySnapshot
 +    admin.disableTable(tableName);
 +    admin.restoreSnapshot(emptySnapshot);
 +    admin.enableTable(tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, 0);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +
 +    // Restore from snapshot-1
 +    admin.disableTable(tableName);
 +    admin.restoreSnapshot(snapshotName1);
 +    admin.enableTable(tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +
 +    // Restore from snapshot-1
 +    TEST_UTIL.deleteTable(tableName);
 +    admin.restoreSnapshot(snapshotName1);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +  }
 +
 +  protected int getNumReplicas() {
 +    return 1;
 +  }
 +
 +  @Test
 +  public void testRestoreSchemaChange() throws Exception {
 +    byte[] TEST_FAMILY2 = Bytes.toBytes("cf2");
 +
-     HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
++    Table table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
++            .getTable(tableName);
 +
 +    // Add one column family and put some data in it
 +    admin.disableTable(tableName);
 +    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY2);
 +    hcd.setMobEnabled(true);
 +    hcd.setMobThreshold(3L);
 +    admin.addColumn(tableName, hcd);
 +    admin.enableTable(tableName);
 +    assertEquals(2, table.getTableDescriptor().getFamilies().size());
 +    HTableDescriptor htd = admin.getTableDescriptor(tableName);
 +    assertEquals(2, htd.getFamilies().size());
 +    SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2);
 +    long snapshot2Rows = snapshot1Rows + 500;
 +    assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table));
 +    assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2));
 +    Set<String> fsFamilies = getFamiliesFromFS(tableName);
 +    assertEquals(2, fsFamilies.size());
 +
 +    // Take a snapshot
 +    admin.disableTable(tableName);
 +    admin.snapshot(snapshotName2, tableName);
 +
 +    // Restore the snapshot (without the cf)
 +    admin.restoreSnapshot(snapshotName0);
 +    admin.enableTable(tableName);
 +    assertEquals(1, table.getTableDescriptor().getFamilies().size());
 +    try {
 +      MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2);
 +      fail("family '" + Bytes.toString(TEST_FAMILY2) + "' should not exists");
 +    } catch (NoSuchColumnFamilyException e) {
 +      // expected
 +    }
 +    assertEquals(snapshot0Rows, MobSnapshotTestingUtils.countMobRows(table));
 +    htd = admin.getTableDescriptor(tableName);
 +    assertEquals(1, htd.getFamilies().size());
 +    fsFamilies = getFamiliesFromFS(tableName);
 +    assertEquals(1, fsFamilies.size());
 +
 +    // Restore back the snapshot (with the cf)
 +    admin.disableTable(tableName);
 +    admin.restoreSnapshot(snapshotName2);
 +    admin.enableTable(tableName);
 +    htd = admin.getTableDescriptor(tableName);
 +    assertEquals(2, htd.getFamilies().size());
 +    assertEquals(2, table.getTableDescriptor().getFamilies().size());
 +    assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2));
 +    assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table));
 +    fsFamilies = getFamiliesFromFS(tableName);
 +    assertEquals(2, fsFamilies.size());
 +    table.close();
 +  }
 +
 +  @Test
 +  public void testCloneSnapshotOfCloned() throws IOException, InterruptedException {
 +    TableName clonedTableName =
 +        TableName.valueOf("clonedtb-" + System.currentTimeMillis());
 +    admin.cloneSnapshot(snapshotName0, clonedTableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas());
 +    admin.disableTable(clonedTableName);
 +    admin.snapshot(snapshotName2, clonedTableName);
 +    TEST_UTIL.deleteTable(clonedTableName);
 +    waitCleanerRun();
 +
 +    admin.cloneSnapshot(snapshotName2, clonedTableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas());
 +    TEST_UTIL.deleteTable(clonedTableName);
 +  }
 +
 +  @Test
 +  public void testCloneAndRestoreSnapshot() throws IOException, InterruptedException {
 +    TEST_UTIL.deleteTable(tableName);
 +    waitCleanerRun();
 +
 +    admin.cloneSnapshot(snapshotName0, tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +    waitCleanerRun();
 +
 +    admin.disableTable(tableName);
 +    admin.restoreSnapshot(snapshotName0);
 +    admin.enableTable(tableName);
 +    MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
 +    SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
 +  }
 +
 +  @Test
 +  public void testCorruptedSnapshot() throws IOException, InterruptedException {
 +    SnapshotTestingUtils.corruptSnapshot(TEST_UTIL, Bytes.toString(snapshotName0));
 +    TableName cloneName = TableName.valueOf("corruptedClone-" + System.currentTimeMillis());
 +    try {
 +      admin.cloneSnapshot(snapshotName0, cloneName);
 +      fail("Expected CorruptedSnapshotException, got succeeded cloneSnapshot()");
 +    } catch (CorruptedSnapshotException e) {
 +      // Got the expected corruption exception.
 +      // check for no references of the cloned table.
 +      assertFalse(admin.tableExists(cloneName));
 +    } catch (Exception e) {
 +      fail("Expected CorruptedSnapshotException got: " + e);
 +    }
 +  }
 +
 +  // ==========================================================================
 +  //  Helpers
 +  // ==========================================================================
 +  private void waitCleanerRun() throws InterruptedException {
 +    TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
 +  }
 +
 +  private Set<String> getFamiliesFromFS(final TableName tableName) throws IOException {
 +    MasterFileSystem mfs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
 +    Set<String> families = new HashSet<String>();
 +    Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName);
 +    for (Path regionDir: FSUtils.getRegionDirs(mfs.getFileSystem(), tableDir)) {
 +      for (Path familyDir: FSUtils.getFamilyDirs(mfs.getFileSystem(), regionDir)) {
 +        families.add(familyDir.getName());
 +      }
 +    }
 +    return families;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
index 612b98a,0000000..a2cd51c
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
@@@ -1,376 -1,0 +1,375 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.hadoop.hbase.client;
 +
 +import java.util.List;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.HBaseTestingUtility;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.HRegionInfo;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
 +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 +import org.apache.hadoop.hbase.testclassification.LargeTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +/**
 + * Test to verify that the cloned table is independent of the table from which it was cloned
 + */
 +@Category(LargeTests.class)
 +public class TestMobSnapshotCloneIndependence {
 +  private static final Log LOG = LogFactory.getLog(TestSnapshotCloneIndependence.class);
 +
 +  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 +
 +  private static final int NUM_RS = 2;
 +  private static final String STRING_TABLE_NAME = "test";
 +  private static final String TEST_FAM_STR = "fam";
 +  private static final byte[] TEST_FAM = Bytes.toBytes(TEST_FAM_STR);
 +  private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
 +
 +  /**
 +   * Setup the config for the cluster and start it
 +   * @throws Exception on failure
 +   */
 +  @BeforeClass
 +  public static void setupCluster() throws Exception {
 +    setupConf(UTIL.getConfiguration());
 +    UTIL.startMiniCluster(NUM_RS);
 +  }
 +
 +  private static void setupConf(Configuration conf) {
 +    // enable snapshot support
 +    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
 +    // disable the ui
 +    conf.setInt("hbase.regionsever.info.port", -1);
 +    // change the flush size to a small amount, regulating number of store files
 +    conf.setInt("hbase.hregion.memstore.flush.size", 25000);
 +    // so make sure we get a compaction when doing a load, but keep around
 +    // some files in the store
 +    conf.setInt("hbase.hstore.compaction.min", 10);
 +    conf.setInt("hbase.hstore.compactionThreshold", 10);
 +    // block writes if we get to 12 store files
 +    conf.setInt("hbase.hstore.blockingStoreFiles", 12);
 +    conf.setInt("hbase.regionserver.msginterval", 100);
 +    conf.setBoolean("hbase.master.enabletable.roundrobin", true);
 +    // Avoid potentially aggressive splitting which would cause snapshot to fail
 +    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
 +      ConstantSizeRegionSplitPolicy.class.getName());
 +    conf.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
 +  }
 +
 +  @Before
 +  public void setup() throws Exception {
 +    MobSnapshotTestingUtils.createMobTable(UTIL, TableName.valueOf(STRING_TABLE_NAME), TEST_FAM);
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    UTIL.deleteTable(TABLE_NAME);
 +    SnapshotTestingUtils.deleteAllSnapshots(UTIL.getHBaseAdmin());
 +    SnapshotTestingUtils.deleteArchiveDirectory(UTIL);
 +  }
 +
 +  @AfterClass
 +  public static void cleanupTest() throws Exception {
 +    try {
 +      UTIL.shutdownMiniCluster();
 +    } catch (Exception e) {
 +      LOG.warn("failure shutting down cluster", e);
 +    }
 +  }
 +
 +  /**
 +   * Verify that adding data to the cloned table will not affect the original, and vice-versa when
 +   * it is taken as an online snapshot.
 +   */
 +  @Test (timeout=300000)
 +  public void testOnlineSnapshotAppendIndependent() throws Exception {
 +    runTestSnapshotAppendIndependent(true);
 +  }
 +
 +  /**
 +   * Verify that adding data to the cloned table will not affect the original, and vice-versa when
 +   * it is taken as an offline snapshot.
 +   */
 +  @Test (timeout=300000)
 +  public void testOfflineSnapshotAppendIndependent() throws Exception {
 +    runTestSnapshotAppendIndependent(false);
 +  }
 +
 +  /**
 +   * Verify that adding metadata to the cloned table will not affect the original, and vice-versa
 +   * when it is taken as an online snapshot.
 +   */
 +  @Test (timeout=300000)
 +  public void testOnlineSnapshotMetadataChangesIndependent() throws Exception {
 +    runTestSnapshotMetadataChangesIndependent(true);
 +  }
 +
 +  /**
 +   * Verify that adding netadata to the cloned table will not affect the original, and vice-versa
 +   * when is taken as an online snapshot.
 +   */
 +  @Test (timeout=300000)
 +  public void testOfflineSnapshotMetadataChangesIndependent() throws Exception {
 +    runTestSnapshotMetadataChangesIndependent(false);
 +  }
 +
 +  /**
 +   * Verify that region operations, in this case splitting a region, are independent between the
 +   * cloned table and the original.
 +   */
 +  @Test (timeout=300000)
 +  public void testOfflineSnapshotRegionOperationsIndependent() throws Exception {
 +    runTestRegionOperationsIndependent(false);
 +  }
 +
 +  /**
 +   * Verify that region operations, in this case splitting a region, are independent between the
 +   * cloned table and the original.
 +   */
 +  @Test (timeout=300000)
 +  public void testOnlineSnapshotRegionOperationsIndependent() throws Exception {
 +    runTestRegionOperationsIndependent(true);
 +  }
 +
 +  private static void waitOnSplit(final HTable t, int originalCount) throws Exception {
 +    for (int i = 0; i < 200; i++) {
 +      try {
 +        Thread.sleep(50);
 +      } catch (InterruptedException e) {
 +        // Restore the interrupted status
 +        Thread.currentThread().interrupt();
 +      }
 +      if (t.getRegionLocations().size() > originalCount) {
 +        return;
 +      }
 +    }
 +    throw new Exception("Split did not increase the number of regions");
 +  }
 +
 +  /*
 +   * Take a snapshot of a table, add data, and verify that this only
 +   * affects one table
 +   * @param online - Whether the table is online or not during the snapshot
 +   */
 +  private void runTestSnapshotAppendIndependent(boolean online) throws Exception {
 +    FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
 +    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
 +
 +    Admin admin = UTIL.getHBaseAdmin();
 +    final long startTime = System.currentTimeMillis();
 +    final TableName localTableName =
 +        TableName.valueOf(STRING_TABLE_NAME + startTime);
 +
-     HTable original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
++    Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
 +    try {
 +
 +      SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM);
 +      final int origTableRowCount = MobSnapshotTestingUtils.countMobRows(original);
 +
 +      // Take a snapshot
 +      final String snapshotNameAsString = "snapshot_" + localTableName;
 +      byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
 +
 +      SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
 +        snapshotNameAsString, rootDir, fs, online);
 +
 +      if (!online) {
 +        admin.enableTable(localTableName);
 +      }
 +      TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
 +      admin.cloneSnapshot(snapshotName, cloneTableName);
 +
-       HTable clonedTable = new HTable(UTIL.getConfiguration(), cloneTableName);
++      Table clonedTable = ConnectionFactory.createConnection(UTIL.getConfiguration())
++              .getTable(cloneTableName);
 +
 +      try {
 +        final int clonedTableRowCount = MobSnapshotTestingUtils.countMobRows(clonedTable);
 +
 +        Assert.assertEquals(
 +          "The line counts of original and cloned tables do not match after clone. ",
 +          origTableRowCount, clonedTableRowCount);
 +
 +        // Attempt to add data to the test
 +        final String rowKey = "new-row-" + System.currentTimeMillis();
 +
 +        Put p = new Put(Bytes.toBytes(rowKey));
 +        p.add(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString"));
 +        original.put(p);
-         original.flushCommits();
 +
 +        // Verify that it is not present in the original table
 +        Assert.assertEquals("The row count of the original table was not modified by the put",
 +          origTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(original));
 +        Assert.assertEquals(
 +          "The row count of the cloned table changed as a result of addition to the original",
 +          clonedTableRowCount, MobSnapshotTestingUtils.countMobRows(clonedTable));
 +
 +        p = new Put(Bytes.toBytes(rowKey));
-         p.add(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString"));
++        p.addColumn(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString"));
 +        clonedTable.put(p);
-         clonedTable.flushCommits();
 +
 +        // Verify that the new family is not in the restored table's description
 +        Assert.assertEquals(
 +          "The row count of the original table was modified by the put to the clone",
 +          origTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(original));
 +        Assert.assertEquals("The row count of the cloned table was not modified by the put",
 +          clonedTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(clonedTable));
 +      } finally {
 +
 +        clonedTable.close();
 +      }
 +    } finally {
 +
 +      original.close();
 +    }
 +  }
 +
 +  /*
 +   * Take a snapshot of a table, do a split, and verify that this only affects one table
 +   * @param online - Whether the table is online or not during the snapshot
 +   */
 +  private void runTestRegionOperationsIndependent(boolean online) throws Exception {
 +    FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
 +    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
 +
 +    // Create a table
 +    Admin admin = UTIL.getHBaseAdmin();
 +    final long startTime = System.currentTimeMillis();
 +    final TableName localTableName =
 +        TableName.valueOf(STRING_TABLE_NAME + startTime);
-     HTable original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
++    Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
 +    SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM);
 +    final int loadedTableCount = MobSnapshotTestingUtils.countMobRows(original);
 +    System.out.println("Original table has: " + loadedTableCount + " rows");
 +
 +    final String snapshotNameAsString = "snapshot_" + localTableName;
 +
 +    // Create a snapshot
 +    SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
 +      snapshotNameAsString, rootDir, fs, online);
 +
 +    if (!online) {
 +      admin.enableTable(localTableName);
 +    }
 +
 +    TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
 +
 +    // Clone the snapshot
 +    byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
 +    admin.cloneSnapshot(snapshotName, cloneTableName);
 +
 +    // Verify that region information is the same pre-split
-     original.clearRegionCache();
++    ((HTable)original).clearRegionCache();
 +    List<HRegionInfo> originalTableHRegions = admin.getTableRegions(localTableName);
 +
 +    final int originalRegionCount = originalTableHRegions.size();
 +    final int cloneTableRegionCount = admin.getTableRegions(cloneTableName).size();
 +    Assert.assertEquals(
 +      "The number of regions in the cloned table is different than in the original table.",
 +      originalRegionCount, cloneTableRegionCount);
 +
 +    // Split a region on the parent table
 +    admin.splitRegion(originalTableHRegions.get(0).getRegionName());
-     waitOnSplit(original, originalRegionCount);
++    waitOnSplit((HTable)original, originalRegionCount);
 +
 +    // Verify that the cloned table region is not split
 +    final int cloneTableRegionCount2 = admin.getTableRegions(cloneTableName).size();
 +    Assert.assertEquals(
 +      "The number of regions in the cloned table changed though none of its regions were split.",
 +      cloneTableRegionCount, cloneTableRegionCount2);
 +  }
 +
 +  /*
 +   * Take a snapshot of a table, add metadata, and verify that this only
 +   * affects one table
 +   * @param online - Whether the table is online or not during the snapshot
 +   */
 +  private void runTestSnapshotMetadataChangesIndependent(boolean online) throws Exception {
 +    FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
 +    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
 +
 +    // Create a table
 +    Admin admin = UTIL.getHBaseAdmin();
 +    final long startTime = System.currentTimeMillis();
 +    final TableName localTableName =
 +        TableName.valueOf(STRING_TABLE_NAME + startTime);
-     HTable original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
++    Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
 +    SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM);
 +
 +    final String snapshotNameAsString = "snapshot_" + localTableName;
 +
 +    // Create a snapshot
 +    SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
 +      snapshotNameAsString, rootDir, fs, online);
 +
 +    if (!online) {
 +      admin.enableTable(localTableName);
 +    }
 +    TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
 +
 +    // Clone the snapshot
 +    byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
 +    admin.cloneSnapshot(snapshotName, cloneTableName);
 +
 +    // Add a new column family to the original table
 +    byte[] TEST_FAM_2 = Bytes.toBytes("fam2");
 +    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM_2);
 +
 +    admin.disableTable(localTableName);
 +    admin.addColumn(localTableName, hcd);
 +
 +    // Verify that it is not in the snapshot
 +    admin.enableTable(localTableName);
 +
 +    // get a description of the cloned table
 +    // get a list of its families
 +    // assert that the family is there
 +    HTableDescriptor originalTableDescriptor = original.getTableDescriptor();
 +    HTableDescriptor clonedTableDescriptor = admin.getTableDescriptor(cloneTableName);
 +
 +    Assert.assertTrue("The original family was not found. There is something wrong. ",
 +      originalTableDescriptor.hasFamily(TEST_FAM));
 +    Assert.assertTrue("The original family was not found in the clone. There is something wrong. ",
 +      clonedTableDescriptor.hasFamily(TEST_FAM));
 +
 +    Assert.assertTrue("The new family was not found. ",
 +      originalTableDescriptor.hasFamily(TEST_FAM_2));
 +    Assert.assertTrue("The new family was not found. ",
 +      !clonedTableDescriptor.hasFamily(TEST_FAM_2));
 +  }
 +}