You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/07/22 21:53:04 UTC

[46/50] [abbrv] hbase git commit: HBASE-11339 Merge remote-tracking branch 'apache/hbase-11339' (Jingcheng Du)

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 07d51c0,3837522..7569e7a
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@@ -202,7 -214,7 +202,6 @@@ public class HStore implements Store 
    protected HStore(final HRegion region, final HColumnDescriptor family,
        final Configuration confParam) throws IOException {
  
--    HRegionInfo info = region.getRegionInfo();
      this.fs = region.getRegionFileSystem();
  
      // Assemble the store's home directory and Ensure it exists.

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index ff8c308,cf0d3f5..040c9df
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@@ -35,7 -35,8 +35,9 @@@ import org.apache.hadoop.hbase.ServerNa
  import org.apache.hadoop.hbase.io.hfile.BlockCache;
  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
  import org.apache.hadoop.hbase.io.hfile.CacheStats;
 +import org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider;
+ import org.apache.hadoop.hbase.mob.MobCacheConfig;
+ import org.apache.hadoop.hbase.mob.MobFileCache;
  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  import org.apache.hadoop.hbase.util.FSUtils;
@@@ -441,12 -539,12 +540,8 @@@ class MetricsRegionServerWrapperImp
        HDFSBlocksDistribution hdfsBlocksDistributionSecondaryRegions =
            new HDFSBlocksDistribution();
  
--      long tempNumStores = 0;
--      long tempNumStoreFiles = 0;
--      long tempMemstoreSize = 0;
--      long tempStoreFileSize = 0;
--      long tempReadRequestsCount = 0;
--      long tempWriteRequestsCount = 0;
++      long tempNumStores = 0, tempNumStoreFiles = 0, tempMemstoreSize = 0, tempStoreFileSize = 0;
++      long tempReadRequestsCount = 0, tempWriteRequestsCount = 0;
        long tempCheckAndMutateChecksFailed = 0;
        long tempCheckAndMutateChecksPassed = 0;
        long tempStorefileIndexSize = 0;
@@@ -495,7 -614,7 +611,6 @@@
            hdfsBlocksDistributionSecondaryRegions.add(distro);
          }
        }
--
        float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
            regionServer.getServerName().getHostname());
        tempPercentFileLocal = (int) (localityIndex * 100);
@@@ -512,8 -631,8 +627,6 @@@
        if (lastRan == 0) {
          lastRan = currentTime - period;
        }
--
--
        //If we've time traveled keep the last requests per second.
        if ((currentTime - lastRan) > 0) {
          long currentRequestCount = getTotalRequestCount();

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobCompactionStoreScanner.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobCompactionStoreScanner.java
index 0000000,fc14fa4..822b4e1
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobCompactionStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobCompactionStoreScanner.java
@@@ -1,0 -1,66 +1,66 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.regionserver;
+ 
+ import java.io.IOException;
+ import java.util.List;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Scan;
+ 
+ /**
+  * Scanner scans the MOB Store. Coalesce KeyValue stream into List<KeyValue>
+  * for a single row. It's only used in the compaction of mob-enabled columns.
+  * It outputs the normal cells and delete markers when outputDeleteMarkers is set as true.
+  */
+ @InterfaceAudience.Private
+ public class MobCompactionStoreScanner extends StoreScanner {
+ 
+   /*
+    * The delete markers are probably contained in the output of the scanner, for instance the
+    * minor compaction. If outputDeleteMarkers is set as true, these delete markers could be
+    * written to the del file, otherwise it's not allowed.
+    */
+   protected boolean outputDeleteMarkers;
+ 
+   /**
+    * Used for compactions.<p>
+    *
+    * Opens a scanner across specified StoreFiles.
+    * @param store who we scan
+    * @param scan the spec
+    * @param scanners ancillary scanners
+    * @param smallestReadPoint the readPoint that we should use for tracking
+    *          versions
+    */
+   public MobCompactionStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
+       long earliestPutTs, boolean outputDeleteMarkers) throws IOException {
+     super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs);
+     this.outputDeleteMarkers = outputDeleteMarkers;
+   }
+ 
+   /**
+    * Gets whether the delete markers could be written to the del files.
+    * @return True if the delete markers could be written del files, false if it's not allowed.
+    */
+   public boolean isOutputDeleteMarkers() {
+     return this.outputDeleteMarkers;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
index 0000000,10aea24..83debf1
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
@@@ -1,0 -1,42 +1,42 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.regionserver;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.Cell;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.filter.FilterBase;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ 
+ /**
+  * A filter that returns the cells which have mob reference tags. It's a server-side filter.
+  */
+ @InterfaceAudience.Private
+ class MobReferenceOnlyFilter extends FilterBase {
+ 
+   @Override
+   public ReturnCode filterKeyValue(Cell cell) {
+     if (null != cell) {
+       // If a cell with a mob reference tag, it's included.
+       if (MobUtils.isMobReferenceCell(cell)) {
+         return ReturnCode.INCLUDE;
+       }
+     }
+     return ReturnCode.SKIP;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
index 0000000,46bbfd5..363da3e
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
@@@ -1,0 -1,85 +1,85 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.regionserver;
+ 
+ import java.io.IOException;
+ import java.util.List;
+ import java.util.NavigableSet;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.Cell;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ 
+ /**
+  * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into List<KeyValue>
+  * for a single row.
+  *
+  */
+ @InterfaceAudience.Private
+ public class MobStoreScanner extends StoreScanner {
+ 
+   private boolean cacheMobBlocks = false;
+   private boolean rawMobScan = false;
+   private boolean readEmptyValueOnMobCellMiss = false;
+   private final HMobStore mobStore;
+ 
+   public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+       final NavigableSet<byte[]> columns, long readPt) throws IOException {
+     super(store, scanInfo, scan, columns, readPt);
+     cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
+     rawMobScan = MobUtils.isRawMobScan(scan);
+     readEmptyValueOnMobCellMiss = MobUtils.isReadEmptyValueOnMobCellMiss(scan);
+     if (!(store instanceof HMobStore)) {
+       throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
+     }
+     mobStore = (HMobStore) store;
+   }
+ 
+   /**
+    * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
+    * reference tag), the scanner need seek this cell from the mob file, and use the cell found
+    * from the mob file as the result.
+    */
+   @Override
+   public boolean next(List<Cell> outResult, ScannerContext ctx) throws IOException {
+     boolean result = super.next(outResult, ctx);
+     if (!rawMobScan) {
+       // retrieve the mob data
+       if (outResult.isEmpty()) {
+         return result;
+       }
+       long mobKVCount = 0;
+       long mobKVSize = 0;
+       for (int i = 0; i < outResult.size(); i++) {
+         Cell cell = outResult.get(i);
+         if (MobUtils.isMobReferenceCell(cell)) {
+           Cell mobCell = mobStore
+             .resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
+           mobKVCount++;
+           mobKVSize += mobCell.getValueLength();
+           outResult.set(i, mobCell);
+         }
+       }
+       mobStore.updateMobScanCellsCount(mobKVCount);
+       mobStore.updateMobScanCellsSize(mobKVSize);
+     }
+     return result;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
index 0000000,78c1720..5ba1226
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
@@@ -1,0 -1,85 +1,85 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.regionserver;
+ 
+ import java.io.IOException;
+ import java.util.List;
+ import java.util.NavigableSet;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.Cell;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ 
+ /**
+  * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
+  * reversed scanning in both the memstore and the MOB store.
+  *
+  */
+ @InterfaceAudience.Private
+ public class ReversedMobStoreScanner extends ReversedStoreScanner {
+ 
+   private boolean cacheMobBlocks = false;
+   private boolean rawMobScan = false;
+   private boolean readEmptyValueOnMobCellMiss = false;
+   protected final HMobStore mobStore;
+ 
+   ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
+       long readPt) throws IOException {
+     super(store, scanInfo, scan, columns, readPt);
+     cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
+     rawMobScan = MobUtils.isRawMobScan(scan);
+     readEmptyValueOnMobCellMiss = MobUtils.isReadEmptyValueOnMobCellMiss(scan);
+     if (!(store instanceof HMobStore)) {
+       throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
+     }
+     mobStore = (HMobStore) store;
+   }
+ 
+   /**
+    * Firstly reads the cells from the HBase. If the cell is a reference cell (which has the
+    * reference tag), the scanner need seek this cell from the mob file, and use the cell found
+    * from the mob file as the result.
+    */
+   @Override
+   public boolean next(List<Cell> outResult, ScannerContext ctx) throws IOException {
+     boolean result = super.next(outResult, ctx);
+     if (!rawMobScan) {
+       // retrieve the mob data
+       if (outResult.isEmpty()) {
+         return result;
+       }
+       long mobKVCount = 0;
+       long mobKVSize = 0;
+       for (int i = 0; i < outResult.size(); i++) {
+         Cell cell = outResult.get(i);
+         if (MobUtils.isMobReferenceCell(cell)) {
+           Cell mobCell = mobStore
+             .resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
+           mobKVCount++;
+           mobKVSize += mobCell.getValueLength();
+           outResult.set(i, mobCell);
+         }
+       }
+       mobStore.updateMobScanCellsCount(mobKVCount);
+       mobStore.updateMobScanCellsSize(mobKVSize);
+     }
+     return result;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 78a5cac,25a4cd3..b64f40f
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@@ -45,7 -44,10 +45,9 @@@ import org.apache.hadoop.hbase.regionse
  import org.apache.hadoop.hbase.regionserver.StoreFile;
  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
  import org.apache.hadoop.hbase.regionserver.StoreScanner;
+ import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
  import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.hbase.util.Writables;
 -import org.apache.hadoop.util.StringUtils;
  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
  
@@@ -222,35 -234,20 +234,38 @@@ public abstract class Compactor 
      return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
    }
  
 -  // TODO mob introduced the fd parameter; can we make this cleaner and easier to extend in future?
 +  /**
 +   * Used to prevent compaction name conflict when multiple compactions running parallel on the
 +   * same store.
 +   */
 +  private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
 +
 +  private String generateCompactionName() {
 +    int counter;
 +    for (;;) {
 +      counter = NAME_COUNTER.get();
 +      int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
 +      if (NAME_COUNTER.compareAndSet(counter, next)) {
 +        break;
 +      }
 +    }
 +    return store.getRegionInfo().getRegionNameAsString() + "#"
 +        + store.getFamily().getNameAsString() + "#" + counter;
 +  }
++
    /**
     * Performs the compaction.
+    * @param fd FileDetails of cell sink writer
     * @param scanner Where to read from.
     * @param writer Where to write to.
     * @param smallestReadPoint Smallest read point.
 -   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
 +   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;= smallestReadPoint
+    * @param major Is a major compaction.
     * @return Whether compaction ended; false if it was interrupted for some reason.
     */
-   protected boolean performCompaction(InternalScanner scanner, CellSink writer,
+   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
        long smallestReadPoint, boolean cleanSeqId,
-       CompactionThroughputController throughputController) throws IOException {
+       CompactionThroughputController throughputController, boolean major) throws IOException {
      long bytesWritten = 0;
      long bytesWrittenProgress = 0;
      // Since scanner.next() can return 'false' but still be delivering data,

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index bc8dd01,68ce76a..cd169f4
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@@ -99,12 -99,10 +99,9 @@@ public class DefaultCompactor extends C
            cleanSeqId = true;
          }
  
-         // When all MVCC readpoints are 0, don't write them.
-         // See HBASE-8166, HBASE-12600, and HBASE-13389.
-         writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
-           fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0);
-         boolean finished =
-             performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController);
+         writer = createTmpWriter(fd, smallestReadPoint);
 -        boolean finished =
 -            performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, throughputController,
 -                    request.isAllFiles());
++        boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
++          throughputController, request.isAllFiles());
          if (!finished) {
            writer.close();
            store.getFileSystem().delete(writer.getPath(), false);
@@@ -148,6 -146,24 +145,24 @@@
    }
  
    /**
+    * Creates a writer for a new file in a temporary directory.
+    * @param fd The file details.
+    * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
+    * @return Writer for a new StoreFile in the tmp dir.
+    * @throws IOException
+    */
 -  protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
++  protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint)
++    throws IOException {
+     // When all MVCC readpoints are 0, don't write them.
+     // See HBASE-8166, HBASE-12600, and HBASE-13389.
+ 
+     // make this writer with tags always because of possible new cells with tags.
 -    StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
++    return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
+             true, fd.maxMVCCReadpoint >= 0, fd.maxTagsLength >0);
 -    return writer;
+   }
+ 
+ 
+   /**
     * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
     * {@link #compact(CompactionRequest, CompactionThroughputController)};
     * @param filesToCompact the files to compact. These are used as the compactionSelection for

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
index 5021c74,841bc04..132e187
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
@@@ -51,8 -51,11 +51,9 @@@ import org.apache.hadoop.hbase.HConstan
  import org.apache.hadoop.hbase.HRegionInfo;
  import org.apache.hadoop.hbase.io.FileLink;
  import org.apache.hadoop.hbase.io.HFileLink;
 -import org.apache.hadoop.hbase.io.hfile.HFile;
 -import org.apache.hadoop.hbase.mapreduce.JobUtil;
  import org.apache.hadoop.hbase.io.WALLink;
  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+ import org.apache.hadoop.hbase.mob.MobUtils;
  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
@@@ -108,8 -110,8 +110,6 @@@ public class ExportSnapshot extends Con
    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
    static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
  
--  private static final String INPUT_FOLDER_PREFIX = "export-files.";
--
    // Export Map-Reduce Counters, to keep track of the progress
    public enum Counter {
      MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
@@@ -507,9 -519,9 +517,8 @@@
          @Override
          public void storeFile(final HRegionInfo regionInfo, final String family,
              final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
--          if (storeFile.hasReference()) {
--            // copied as part of the manifest
--          } else {
++          // for storeFile.hasReference() case, copied as part of the manifest
++          if (!storeFile.hasReference()) {
              String region = regionInfo.getEncodedName();
              String hfile = storeFile.getName();
              Path path = HFileLink.createPath(table, region, family, hfile);

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
index 441dbbf,479c600..8e7a222
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
@@@ -49,6 -49,8 +49,7 @@@ import org.apache.hadoop.hbase.client.C
  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
  import org.apache.hadoop.hbase.io.HFileLink;
  import org.apache.hadoop.hbase.io.Reference;
 -import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+ import org.apache.hadoop.hbase.mob.MobUtils;
  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
@@@ -712,8 -791,8 +790,8 @@@ public class RestoreSnapshotHelper 
        Path restoreDir, String snapshotName) throws IOException {
      // ensure that restore dir is not under root dir
      if (!restoreDir.getFileSystem(conf).getUri().equals(rootDir.getFileSystem(conf).getUri())) {
--      throw new IllegalArgumentException("Filesystems for restore directory and HBase root directory " +
--          "should be the same");
++      throw new IllegalArgumentException("Filesystems for restore directory and HBase root " +
++          "directory should be the same");
      }
      if (restoreDir.toUri().getPath().startsWith(rootDir.toUri().getPath())) {
        throw new IllegalArgumentException("Restore directory cannot be a sub directory of HBase " +

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
index 606b9c9,6315b26..d91f046
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
@@@ -441,11 -458,14 +458,15 @@@ public final class SnapshotInfo extend
      }
  
      if (showStats) {
-       System.out.printf("%d HFiles (%d in archive), total size %s (%.2f%% %s shared with the source table)%n",
+       System.out.printf("%d HFiles (%d in archive, %d in mob storage), total size %s " +
+               "(%.2f%% %s shared with the source table, %.2f%% %s in mob dir)%n",
 -        stats.getStoreFilesCount(), stats.getArchivedStoreFilesCount(), stats.getMobStoreFilesCount(),
 +        stats.getStoreFilesCount(), stats.getArchivedStoreFilesCount(),
++        stats.getMobStoreFilesCount(),
          fileSizeToString(stats.getStoreFilesSize()),
          stats.getSharedStoreFilePercentage(),
-         fileSizeToString(stats.getSharedStoreFilesSize())
+         fileSizeToString(stats.getSharedStoreFilesSize()),
+         stats.getMobStoreFilePercentage(),
+         fileSizeToString(stats.getMobStoreFilesSize())
        );
        System.out.printf("%d Logs, total size %s%n",
          stats.getLogsCount(), fileSizeToString(stats.getLogsSize()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
index d1e0236,5f3235b..b05caef
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
@@@ -43,7 -44,9 +44,8 @@@ import org.apache.hadoop.hbase.protobuf
  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 -import org.apache.hadoop.hbase.util.FSUtils;
  import org.apache.hadoop.hbase.util.FSVisitor;
+ import org.apache.hadoop.hbase.util.HFileArchiveUtil;
  
  /**
   * Utility methods for interacting with the snapshot referenced files.

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
index 0000000,5f1b85f..710ffe5
mode 000000,100644..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
@@@ -1,0 -1,433 +1,435 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hbase.client;
+ 
+ import java.util.List;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.CellUtil;
+ import org.apache.hadoop.hbase.TableName;
+ import org.apache.hadoop.hbase.HBaseTestingUtility;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.HRegionInfo;
+ import org.apache.hadoop.hbase.HTableDescriptor;
+ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+ import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
+ import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner;
+ import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+ import org.apache.hadoop.hbase.mob.MobConstants;
+ import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+ import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
+ import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+ import org.apache.hadoop.hbase.testclassification.LargeTests;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.junit.After;
+ import org.junit.AfterClass;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ 
+ /**
+  * Test to verify that the cloned table is independent of the table from which it was cloned
+  */
+ @Category(LargeTests.class)
+ public class TestMobSnapshotCloneIndependence {
+   private static final Log LOG = LogFactory.getLog(TestSnapshotCloneIndependence.class);
+ 
+   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ 
+   private static final int NUM_RS = 2;
+   private static final String STRING_TABLE_NAME = "test";
+   private static final String TEST_FAM_STR = "fam";
+   private static final byte[] TEST_FAM = Bytes.toBytes(TEST_FAM_STR);
+   private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
+ 
+   /**
+    * Setup the config for the cluster and start it
+    * @throws Exception on failure
+    */
+   @BeforeClass
+   public static void setupCluster() throws Exception {
+     setupConf(UTIL.getConfiguration());
+     UTIL.startMiniCluster(NUM_RS);
+   }
+ 
+   private static void setupConf(Configuration conf) {
+     // enable snapshot support
+     conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+     // disable the ui
+     conf.setInt("hbase.regionsever.info.port", -1);
+     // change the flush size to a small amount, regulating number of store files
+     conf.setInt("hbase.hregion.memstore.flush.size", 25000);
+     // so make sure we get a compaction when doing a load, but keep around
+     // some files in the store
+     conf.setInt("hbase.hstore.compaction.min", 10);
+     conf.setInt("hbase.hstore.compactionThreshold", 10);
+     // block writes if we get to 12 store files
+     conf.setInt("hbase.hstore.blockingStoreFiles", 12);
+     conf.setInt("hbase.regionserver.msginterval", 100);
+     conf.setBoolean("hbase.master.enabletable.roundrobin", true);
+     // Avoid potentially aggressive splitting which would cause snapshot to fail
+     conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
+       ConstantSizeRegionSplitPolicy.class.getName());
+     conf.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+     conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, SnapshotHFileCleaner.class.getName() + ","
+       + HFileLinkCleaner.class.getName());
+   }
+ 
+   @Before
+   public void setup() throws Exception {
+     MobSnapshotTestingUtils.createMobTable(UTIL, TableName.valueOf(STRING_TABLE_NAME), TEST_FAM);
+   }
+ 
+   @After
+   public void tearDown() throws Exception {
+     UTIL.deleteTable(TABLE_NAME);
+     SnapshotTestingUtils.deleteAllSnapshots(UTIL.getHBaseAdmin());
+     SnapshotTestingUtils.deleteArchiveDirectory(UTIL);
+   }
+ 
+   @AfterClass
+   public static void cleanupTest() throws Exception {
+     try {
+       UTIL.shutdownMiniCluster();
+     } catch (Exception e) {
+       LOG.warn("failure shutting down cluster", e);
+     }
+   }
+ 
+   /**
+    * Verify that adding data to the cloned table will not affect the original, and vice-versa when
+    * it is taken as an online snapshot.
+    */
+   @Test (timeout=300000)
+   public void testOnlineSnapshotAppendIndependent() throws Exception {
+     runTestSnapshotAppendIndependent(true);
+   }
+ 
+   /**
+    * Verify that adding data to the cloned table will not affect the original, and vice-versa when
+    * it is taken as an offline snapshot.
+    */
+   @Test (timeout=300000)
+   public void testOfflineSnapshotAppendIndependent() throws Exception {
+     runTestSnapshotAppendIndependent(false);
+   }
+ 
+   /**
+    * Verify that adding metadata to the cloned table will not affect the original, and vice-versa
+    * when it is taken as an online snapshot.
+    */
+   @Test (timeout=300000)
+   public void testOnlineSnapshotMetadataChangesIndependent() throws Exception {
+     runTestSnapshotMetadataChangesIndependent(true);
+   }
+ 
+   /**
+    * Verify that adding netadata to the cloned table will not affect the original, and vice-versa
+    * when is taken as an online snapshot.
+    */
+   @Test (timeout=300000)
+   public void testOfflineSnapshotMetadataChangesIndependent() throws Exception {
+     runTestSnapshotMetadataChangesIndependent(false);
+   }
+ 
+   /**
+    * Verify that region operations, in this case splitting a region, are independent between the
+    * cloned table and the original.
+    */
+   @Test (timeout=300000)
+   public void testOfflineSnapshotRegionOperationsIndependent() throws Exception {
+     runTestRegionOperationsIndependent(false);
+   }
+ 
+   /**
+    * Verify that region operations, in this case splitting a region, are independent between the
+    * cloned table and the original.
+    */
+   @Test (timeout=300000)
+   public void testOnlineSnapshotRegionOperationsIndependent() throws Exception {
+     runTestRegionOperationsIndependent(true);
+   }
+ 
+   /**
+    * Verify the mob cells still exist after the table to be cloned is deleted.
+    */
+   @Test (timeout=300000)
+   public void testDeleteTableToBeCloned() throws Exception {
+     FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+     Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+     TableName tn = TableName.valueOf("testDeleteTableToBeCloned");
+     byte[] qf = Bytes.toBytes("qf");
+     MobSnapshotTestingUtils.createMobTable(UTIL, tn, TEST_FAM);
+     String row = "row";
+     String value = "value";
+     Put put = new Put(Bytes.toBytes(row));
+     put.addColumn(TEST_FAM, qf, Bytes.toBytes(value));
+     Admin admin = UTIL.getHBaseAdmin();
+     BufferedMutator mutator = UTIL.getConnection().getBufferedMutator(tn);
+     mutator.mutate(put);
+     mutator.flush();
+     admin.flush(tn);
+     // Take a snapshot
+     final String snapshotNameAsString = "snapshot_" + tn;
+     byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+     Table table = ConnectionFactory.createConnection(UTIL.getConfiguration()).getTable(tn);
+     Table clonedTable = null;
+     try {
+       SnapshotTestingUtils.createSnapshotAndValidate(admin, tn, TEST_FAM_STR, snapshotNameAsString,
+         rootDir, fs, true);
+       TableName cloneTableName = TableName.valueOf("test-clone-" + tn);
+       admin.cloneSnapshot(snapshotName, cloneTableName);
+       clonedTable = ConnectionFactory.createConnection(UTIL.getConfiguration()).getTable(
+         cloneTableName);
+       admin.deleteSnapshot(snapshotName);
+       admin.disableTable(tn);
+       admin.deleteTable(tn);
+       // run the cleaner
+       UTIL.getHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
+       // make sure the mob cell exists
+       Scan scan = new Scan();
+       ResultScanner scanner = clonedTable.getScanner(scan);
+       Result rs = scanner.next();
+       Cell cell = rs.getColumnLatestCell(TEST_FAM, qf);
+       Assert.assertEquals(value, Bytes.toString(CellUtil.cloneValue(cell)));
+       Assert.assertNull(scanner.next());
+     } finally {
+       table.close();
+       if (clonedTable != null) {
+         clonedTable.close();
+       }
+     }
+   }
+ 
 -  private static void waitOnSplit(final HTable t, int originalCount) throws Exception {
++  private static void waitOnSplit(Connection c, final Table t, int originalCount) throws Exception {
+     for (int i = 0; i < 200; i++) {
+       try {
+         Thread.sleep(50);
+       } catch (InterruptedException e) {
+         // Restore the interrupted status
+         Thread.currentThread().interrupt();
+       }
 -      if (t.getRegionLocations().size() > originalCount) {
 -        return;
++      try (RegionLocator locator = c.getRegionLocator(t.getName())) {
++        if (locator.getAllRegionLocations().size() > originalCount) {
++          return;
++        }
+       }
+     }
+     throw new Exception("Split did not increase the number of regions");
+   }
+ 
+   /*
+    * Take a snapshot of a table, add data, and verify that this only
+    * affects one table
+    * @param online - Whether the table is online or not during the snapshot
+    */
+   private void runTestSnapshotAppendIndependent(boolean online) throws Exception {
+     FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+     Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ 
+     Admin admin = UTIL.getHBaseAdmin();
+     final long startTime = System.currentTimeMillis();
+     final TableName localTableName =
+         TableName.valueOf(STRING_TABLE_NAME + startTime);
+ 
+     Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
+     try {
+ 
+       SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM);
+       final int origTableRowCount = MobSnapshotTestingUtils.countMobRows(original);
+ 
+       // Take a snapshot
+       final String snapshotNameAsString = "snapshot_" + localTableName;
+       byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+ 
+       SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
+         snapshotNameAsString, rootDir, fs, online);
+ 
+       if (!online) {
+         admin.enableTable(localTableName);
+       }
+       TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
+       admin.cloneSnapshot(snapshotName, cloneTableName);
+ 
+       Table clonedTable = ConnectionFactory.createConnection(UTIL.getConfiguration())
+               .getTable(cloneTableName);
+ 
+       try {
+         final int clonedTableRowCount = MobSnapshotTestingUtils.countMobRows(clonedTable);
+ 
+         Assert.assertEquals(
+           "The line counts of original and cloned tables do not match after clone. ",
+           origTableRowCount, clonedTableRowCount);
+ 
+         // Attempt to add data to the test
+         final String rowKey = "new-row-" + System.currentTimeMillis();
+ 
+         Put p = new Put(Bytes.toBytes(rowKey));
+         p.add(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString"));
+         original.put(p);
+ 
+         // Verify that it is not present in the original table
+         Assert.assertEquals("The row count of the original table was not modified by the put",
+           origTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(original));
+         Assert.assertEquals(
+           "The row count of the cloned table changed as a result of addition to the original",
+           clonedTableRowCount, MobSnapshotTestingUtils.countMobRows(clonedTable));
+ 
+         p = new Put(Bytes.toBytes(rowKey));
+         p.addColumn(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString"));
+         clonedTable.put(p);
+ 
+         // Verify that the new family is not in the restored table's description
+         Assert.assertEquals(
+           "The row count of the original table was modified by the put to the clone",
+           origTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(original));
+         Assert.assertEquals("The row count of the cloned table was not modified by the put",
+           clonedTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(clonedTable));
+       } finally {
+ 
+         clonedTable.close();
+       }
+     } finally {
+ 
+       original.close();
+     }
+   }
+ 
+   /*
+    * Take a snapshot of a table, do a split, and verify that this only affects one table
+    * @param online - Whether the table is online or not during the snapshot
+    */
+   private void runTestRegionOperationsIndependent(boolean online) throws Exception {
+     FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+     Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ 
+     // Create a table
+     Admin admin = UTIL.getHBaseAdmin();
+     final long startTime = System.currentTimeMillis();
+     final TableName localTableName =
+         TableName.valueOf(STRING_TABLE_NAME + startTime);
+     Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
+     SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM);
+     final int loadedTableCount = MobSnapshotTestingUtils.countMobRows(original);
+     System.out.println("Original table has: " + loadedTableCount + " rows");
+ 
+     final String snapshotNameAsString = "snapshot_" + localTableName;
+ 
+     // Create a snapshot
+     SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
+       snapshotNameAsString, rootDir, fs, online);
+ 
+     if (!online) {
+       admin.enableTable(localTableName);
+     }
+ 
+     TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
+ 
+     // Clone the snapshot
+     byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+     admin.cloneSnapshot(snapshotName, cloneTableName);
+ 
+     // Verify that region information is the same pre-split
+     ((HTable)original).clearRegionCache();
+     List<HRegionInfo> originalTableHRegions = admin.getTableRegions(localTableName);
+ 
+     final int originalRegionCount = originalTableHRegions.size();
+     final int cloneTableRegionCount = admin.getTableRegions(cloneTableName).size();
+     Assert.assertEquals(
+       "The number of regions in the cloned table is different than in the original table.",
+       originalRegionCount, cloneTableRegionCount);
+ 
+     // Split a region on the parent table
+     admin.splitRegion(originalTableHRegions.get(0).getRegionName());
 -    waitOnSplit((HTable)original, originalRegionCount);
++    waitOnSplit(UTIL.getConnection(), original, originalRegionCount);
+ 
+     // Verify that the cloned table region is not split
+     final int cloneTableRegionCount2 = admin.getTableRegions(cloneTableName).size();
+     Assert.assertEquals(
+       "The number of regions in the cloned table changed though none of its regions were split.",
+       cloneTableRegionCount, cloneTableRegionCount2);
+   }
+ 
+   /*
+    * Take a snapshot of a table, add metadata, and verify that this only
+    * affects one table
+    * @param online - Whether the table is online or not during the snapshot
+    */
+   private void runTestSnapshotMetadataChangesIndependent(boolean online) throws Exception {
+     FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+     Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ 
+     // Create a table
+     Admin admin = UTIL.getHBaseAdmin();
+     final long startTime = System.currentTimeMillis();
+     final TableName localTableName =
+         TableName.valueOf(STRING_TABLE_NAME + startTime);
+     Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
+     SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM);
+ 
+     final String snapshotNameAsString = "snapshot_" + localTableName;
+ 
+     // Create a snapshot
+     SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
+       snapshotNameAsString, rootDir, fs, online);
+ 
+     if (!online) {
+       admin.enableTable(localTableName);
+     }
+     TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
+ 
+     // Clone the snapshot
+     byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+     admin.cloneSnapshot(snapshotName, cloneTableName);
+ 
+     // Add a new column family to the original table
+     byte[] TEST_FAM_2 = Bytes.toBytes("fam2");
+     HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM_2);
+ 
+     admin.disableTable(localTableName);
+     admin.addColumn(localTableName, hcd);
+ 
+     // Verify that it is not in the snapshot
+     admin.enableTable(localTableName);
+ 
+     // get a description of the cloned table
+     // get a list of its families
+     // assert that the family is there
+     HTableDescriptor originalTableDescriptor = original.getTableDescriptor();
+     HTableDescriptor clonedTableDescriptor = admin.getTableDescriptor(cloneTableName);
+ 
+     Assert.assertTrue("The original family was not found. There is something wrong. ",
+       originalTableDescriptor.hasFamily(TEST_FAM));
+     Assert.assertTrue("The original family was not found in the clone. There is something wrong. ",
+       clonedTableDescriptor.hasFamily(TEST_FAM));
+ 
+     Assert.assertTrue("The new family was not found. ",
+       originalTableDescriptor.hasFamily(TEST_FAM_2));
+     Assert.assertTrue("The new family was not found. ",
+       !clonedTableDescriptor.hasFamily(TEST_FAM_2));
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
index 0000000,7ad49bc..16e29be
mode 000000,100644..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
@@@ -1,0 -1,269 +1,270 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.regionserver;
+ 
+ import java.io.IOException;
+ import java.util.Random;
+ 
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.HBaseTestingUtility;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HRegionInfo;
+ import org.apache.hadoop.hbase.HTableDescriptor;
+ import org.apache.hadoop.hbase.TableName;
+ import org.apache.hadoop.hbase.client.*;
+ import org.apache.hadoop.hbase.mob.MobConstants;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.testclassification.MediumTests;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+ import org.apache.hadoop.hbase.util.FSUtils;
+ import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+ import org.junit.AfterClass;
+ import org.junit.Assert;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ 
+ @Category(MediumTests.class)
+ public class TestDeleteMobTable {
+ 
+   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+   private final static byte[] FAMILY = Bytes.toBytes("family");
+   private final static byte[] QF = Bytes.toBytes("qualifier");
+   private static Random random = new Random();
+ 
+   @BeforeClass
+   public static void setUpBeforeClass() throws Exception {
+     TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+     TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+     TEST_UTIL.startMiniCluster(1);
+   }
+ 
+   @AfterClass
+   public static void tearDownAfterClass() throws Exception {
+     TEST_UTIL.shutdownMiniCluster();
+   }
+ 
+   /**
+    * Generate the mob value.
+    *
+    * @param size
+    *          the size of the value
+    * @return the mob value generated
+    */
+   private static byte[] generateMobValue(int size) {
+     byte[] mobVal = new byte[size];
+     random.nextBytes(mobVal);
+     return mobVal;
+   }
+ 
+   @Test
+   public void testDeleteMobTable() throws Exception {
+     byte[] tableName = Bytes.toBytes("testDeleteMobTable");
+     TableName tn = TableName.valueOf(tableName);
+     HTableDescriptor htd = new HTableDescriptor(tn);
+     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+     hcd.setMobEnabled(true);
+     hcd.setMobThreshold(0);
+     htd.addFamily(hcd);
+     HBaseAdmin admin = null;
+     Table table = null;
+     try {
+       admin = TEST_UTIL.getHBaseAdmin();
+       admin.createTable(htd);
+       table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()).getTable(tn);
+       byte[] value = generateMobValue(10);
+ 
+       byte[] row = Bytes.toBytes("row");
+       Put put = new Put(row);
+       put.addColumn(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
+       table.put(put);
+ 
+       admin.flush(tn);
+ 
+       // the mob file exists
+       Assert.assertEquals(1, countMobFiles(tn, hcd.getNameAsString()));
+       Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
+       String fileName = assertHasOneMobRow(table, tn, hcd.getNameAsString());
+       Assert.assertFalse(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
+       Assert.assertTrue(mobTableDirExist(tn));
+       table.close();
+ 
+       admin.disableTable(tn);
+       admin.deleteTable(tn);
+ 
+       Assert.assertFalse(admin.tableExists(tn));
+       Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
+       Assert.assertEquals(1, countArchiveMobFiles(tn, hcd.getNameAsString()));
+       Assert.assertTrue(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
+       Assert.assertFalse(mobTableDirExist(tn));
+     } finally {
+       if (admin != null) {
+         admin.close();
+       }
+     }
+   }
+ 
+   @Test
+   public void testDeleteNonMobTable() throws Exception {
+     byte[] tableName = Bytes.toBytes("testDeleteNonMobTable");
+     TableName tn = TableName.valueOf(tableName);
+     HTableDescriptor htd = new HTableDescriptor(tn);
+     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+     htd.addFamily(hcd);
+     HBaseAdmin admin = null;
+     Table table = null;
+     try {
+       admin = TEST_UTIL.getHBaseAdmin();
+       admin.createTable(htd);
+       table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()).getTable(tn);
+       byte[] value = generateMobValue(10);
+ 
+       byte[] row = Bytes.toBytes("row");
+       Put put = new Put(row);
+       put.addColumn(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
+       table.put(put);
+ 
+       admin.flush(tn);
+       table.close();
+ 
+       // the mob file doesn't exist
+       Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
+       Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
+       Assert.assertFalse(mobTableDirExist(tn));
+ 
+       admin.disableTable(tn);
+       admin.deleteTable(tn);
+ 
+       Assert.assertFalse(admin.tableExists(tn));
+       Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
+       Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
+       Assert.assertFalse(mobTableDirExist(tn));
+     } finally {
+       if (admin != null) {
+         admin.close();
+       }
+     }
+   }
 -  
++
+   @Test
+   public void testMobFamilyDelete() throws Exception {
+     byte[] tableName = Bytes.toBytes("testMobFamilyDelete");
+     TableName tn = TableName.valueOf(tableName);
+     HTableDescriptor htd = new HTableDescriptor(tn);
+     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+     hcd.setMobEnabled(true);
+     hcd.setMobThreshold(0);
+     htd.addFamily(hcd);
+     htd.addFamily(new HColumnDescriptor(Bytes.toBytes("family2")));
+     HBaseAdmin admin = null;
+     Table table = null;
+     try {
+       admin = TEST_UTIL.getHBaseAdmin();
+       admin.createTable(htd);
+       table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()).getTable(tn);
+       byte[] value = generateMobValue(10);
+       byte[] row = Bytes.toBytes("row");
+       Put put = new Put(row);
+       put.addColumn(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
+       table.put(put);
+       admin.flush(tn);
+       // the mob file exists
+       Assert.assertEquals(1, countMobFiles(tn, hcd.getNameAsString()));
+       Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
+       String fileName = assertHasOneMobRow(table, tn, hcd.getNameAsString());
+       Assert.assertFalse(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
+       Assert.assertTrue(mobTableDirExist(tn));
+       admin.deleteColumnFamily(tn, FAMILY);
+       Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
+       Assert.assertEquals(1, countArchiveMobFiles(tn, hcd.getNameAsString()));
+       Assert.assertTrue(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
+       Assert.assertFalse(mobColumnFamilyDirExist(tn));
+     } finally {
+       table.close();
+       if (admin != null) {
+         admin.close();
+       }
+       TEST_UTIL.deleteTable(tableName);
+     }
+   }
+ 
+   private int countMobFiles(TableName tn, String familyName) throws IOException {
+     FileSystem fs = TEST_UTIL.getTestFileSystem();
+     Path mobFileDir = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName);
+     if (fs.exists(mobFileDir)) {
+       return fs.listStatus(mobFileDir).length;
+     } else {
+       return 0;
+     }
+   }
+ 
+   private int countArchiveMobFiles(TableName tn, String familyName)
+       throws IOException {
+     FileSystem fs = TEST_UTIL.getTestFileSystem();
+     Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
+         MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
+     if (fs.exists(storePath)) {
+       return fs.listStatus(storePath).length;
+     } else {
+       return 0;
+     }
+   }
+ 
+   private boolean mobTableDirExist(TableName tn) throws IOException {
+     FileSystem fs = TEST_UTIL.getTestFileSystem();
+     Path tableDir = FSUtils.getTableDir(MobUtils.getMobHome(TEST_UTIL.getConfiguration()), tn);
+     return fs.exists(tableDir);
+   }
 -  
++
+   private boolean mobColumnFamilyDirExist(TableName tn) throws IOException {
+     FileSystem fs = TEST_UTIL.getTestFileSystem();
+     Path tableDir = FSUtils.getTableDir(MobUtils.getMobHome(TEST_UTIL.getConfiguration()), tn);
+     HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(tn);
 -    Path mobFamilyDir = new Path(tableDir, new Path(mobRegionInfo.getEncodedName(), Bytes.toString(FAMILY)));
++    Path mobFamilyDir = new Path(tableDir, new Path(mobRegionInfo.getEncodedName(),
++      Bytes.toString(FAMILY)));
+     return fs.exists(mobFamilyDir);
+   }
+ 
+   private boolean mobArchiveExist(TableName tn, String familyName, String fileName)
+       throws IOException {
+     FileSystem fs = TEST_UTIL.getTestFileSystem();
+     Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
+         MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
+     return fs.exists(new Path(storePath, fileName));
+   }
+ 
+   private String assertHasOneMobRow(Table table, TableName tn, String familyName)
+       throws IOException {
+     Scan scan = new Scan();
+     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+     ResultScanner rs = table.getScanner(scan);
+     Result r = rs.next();
+     Assert.assertNotNull(r);
+     byte[] value = r.getValue(FAMILY, QF);
+     String fileName = Bytes.toString(value, Bytes.SIZEOF_INT, value.length - Bytes.SIZEOF_INT);
+     Path filePath = new Path(
+         MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName), fileName);
+     FileSystem fs = TEST_UTIL.getTestFileSystem();
+     Assert.assertTrue(fs.exists(filePath));
+     r = rs.next();
+     Assert.assertNull(r);
+     return fileName;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index 0000000,0be79bf..4ed918c
mode 000000,100644..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@@ -1,0 -1,557 +1,557 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.regionserver;
+ 
+ import java.io.IOException;
+ import java.security.Key;
+ import java.security.SecureRandom;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.NavigableSet;
+ import java.util.concurrent.ConcurrentSkipListSet;
+ 
+ import javax.crypto.spec.SecretKeySpec;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.HarFileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.CellUtil;
+ import org.apache.hadoop.hbase.HBaseConfiguration;
+ import org.apache.hadoop.hbase.HBaseTestingUtility;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.HRegionInfo;
+ import org.apache.hadoop.hbase.HTableDescriptor;
+ import org.apache.hadoop.hbase.KeyValue;
+ import org.apache.hadoop.hbase.TableName;
+ import org.apache.hadoop.hbase.Tag;
+ import org.apache.hadoop.hbase.TagType;
+ import org.apache.hadoop.hbase.client.Get;
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
+ import org.apache.hadoop.hbase.io.crypto.aes.AES;
+ import org.apache.hadoop.hbase.io.hfile.HFile;
+ import org.apache.hadoop.hbase.mob.MobConstants;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
+ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+ import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+ import org.apache.hadoop.hbase.security.EncryptionUtil;
+ import org.apache.hadoop.hbase.security.User;
+ import org.apache.hadoop.hbase.testclassification.MediumTests;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.hbase.util.FSUtils;
+ import org.apache.hadoop.hbase.wal.WALFactory;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Rule;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ import org.junit.rules.TestName;
+ import org.mockito.Mockito;
+ 
+ @Category(MediumTests.class)
+ public class TestHMobStore {
+   public static final Log LOG = LogFactory.getLog(TestHMobStore.class);
+   @Rule public TestName name = new TestName();
+ 
+   private HMobStore store;
+   private HRegion region;
+   private HColumnDescriptor hcd;
+   private FileSystem fs;
+   private byte [] table = Bytes.toBytes("table");
+   private byte [] family = Bytes.toBytes("family");
+   private byte [] row = Bytes.toBytes("row");
+   private byte [] row2 = Bytes.toBytes("row2");
+   private byte [] qf1 = Bytes.toBytes("qf1");
+   private byte [] qf2 = Bytes.toBytes("qf2");
+   private byte [] qf3 = Bytes.toBytes("qf3");
+   private byte [] qf4 = Bytes.toBytes("qf4");
+   private byte [] qf5 = Bytes.toBytes("qf5");
+   private byte [] qf6 = Bytes.toBytes("qf6");
+   private byte[] value = Bytes.toBytes("value");
+   private byte[] value2 = Bytes.toBytes("value2");
+   private Path mobFilePath;
+   private Date currentDate = new Date();
+   private KeyValue seekKey1;
+   private KeyValue seekKey2;
+   private KeyValue seekKey3;
+   private NavigableSet<byte[]> qualifiers =
+     new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
+   private List<Cell> expected = new ArrayList<Cell>();
+   private long id = System.currentTimeMillis();
+   private Get get = new Get(row);
+   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+   private final String DIR = TEST_UTIL.getDataTestDir("TestHMobStore").toString();
+ 
+   /**
+    * Setup
+    * @throws Exception
+    */
+   @Before
+   public void setUp() throws Exception {
+     qualifiers.add(qf1);
+     qualifiers.add(qf3);
+     qualifiers.add(qf5);
+ 
+     Iterator<byte[]> iter = qualifiers.iterator();
+     while(iter.hasNext()){
+       byte [] next = iter.next();
+       expected.add(new KeyValue(row, family, next, 1, value));
+       get.addColumn(family, next);
+       get.setMaxVersions(); // all versions.
+     }
+   }
+ 
+   private void init(String methodName, Configuration conf, boolean testStore)
+   throws IOException {
+     hcd = new HColumnDescriptor(family);
+     hcd.setMobEnabled(true);
+     hcd.setMobThreshold(3L);
+     hcd.setMaxVersions(4);
+     init(methodName, conf, hcd, testStore);
+   }
+ 
+   private void init(String methodName, Configuration conf,
+       HColumnDescriptor hcd, boolean testStore) throws IOException {
+     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
+     init(methodName, conf, htd, hcd, testStore);
+   }
+ 
+   private void init(String methodName, Configuration conf, HTableDescriptor htd,
+       HColumnDescriptor hcd, boolean testStore) throws IOException {
+     //Setting up tje Region and Store
+     Path basedir = new Path(DIR+methodName);
+     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
+     String logName = "logs";
+     Path logdir = new Path(basedir, logName);
+     FileSystem fs = FileSystem.get(conf);
+     fs.delete(logdir, true);
+ 
+     htd.addFamily(hcd);
+     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
+ 
+     final Configuration walConf = new Configuration(conf);
+     FSUtils.setRootDir(walConf, basedir);
+     final WALFactory wals = new WALFactory(walConf, null, methodName);
+     region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf,
+             info, htd, null);
+     store = new HMobStore(region, hcd, conf);
+     if(testStore) {
+       init(conf, hcd);
+     }
+   }
+ 
+   private void init(Configuration conf, HColumnDescriptor hcd)
+       throws IOException {
+     Path basedir = FSUtils.getRootDir(conf);
+     fs = FileSystem.get(conf);
+     Path homePath = new Path(basedir, Bytes.toString(family) + Path.SEPARATOR
+         + Bytes.toString(family));
+     fs.mkdirs(homePath);
+ 
+     KeyValue key1 = new KeyValue(row, family, qf1, 1, value);
+     KeyValue key2 = new KeyValue(row, family, qf2, 1, value);
+     KeyValue key3 = new KeyValue(row2, family, qf3, 1, value2);
+     KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
+     int maxKeyCount = keys.length;
+     StoreFile.Writer mobWriter = store.createWriterInTmp(currentDate, maxKeyCount,
+         hcd.getCompactionCompression(), region.getRegionInfo().getStartKey());
+     mobFilePath = mobWriter.getPath();
+ 
+     mobWriter.append(key1);
+     mobWriter.append(key2);
+     mobWriter.append(key3);
+     mobWriter.close();
+ 
+     String targetPathName = MobUtils.formatDate(currentDate);
+     byte[] referenceValue = Bytes.toBytes(targetPathName + Path.SEPARATOR + mobFilePath.getName());
+     Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName().getName());
+     KeyValue kv1 = new KeyValue(row, family, qf1, Long.MAX_VALUE, referenceValue);
+     KeyValue kv2 = new KeyValue(row, family, qf2, Long.MAX_VALUE, referenceValue);
+     KeyValue kv3 = new KeyValue(row2, family, qf3, Long.MAX_VALUE, referenceValue);
+     seekKey1 = MobUtils.createMobRefKeyValue(kv1, referenceValue, tableNameTag);
+     seekKey2 = MobUtils.createMobRefKeyValue(kv2, referenceValue, tableNameTag);
+     seekKey3 = MobUtils.createMobRefKeyValue(kv3, referenceValue, tableNameTag);
+   }
+ 
+   /**
+    * Getting data from memstore
+    * @throws IOException
+    */
+   @Test
+   public void testGetFromMemStore() throws IOException {
+     final Configuration conf = HBaseConfiguration.create();
+     init(name.getMethodName(), conf, false);
+ 
+     //Put data in memstore
+     this.store.add(new KeyValue(row, family, qf1, 1, value));
+     this.store.add(new KeyValue(row, family, qf2, 1, value));
+     this.store.add(new KeyValue(row, family, qf3, 1, value));
+     this.store.add(new KeyValue(row, family, qf4, 1, value));
+     this.store.add(new KeyValue(row, family, qf5, 1, value));
+     this.store.add(new KeyValue(row, family, qf6, 1, value));
+ 
+     Scan scan = new Scan(get);
+     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+         scan.getFamilyMap().get(store.getFamily().getName()),
+         0);
+ 
+     List<Cell> results = new ArrayList<Cell>();
+     scanner.next(results);
+     Collections.sort(results, KeyValue.COMPARATOR);
+     scanner.close();
+ 
+     //Compare
+     Assert.assertEquals(expected.size(), results.size());
+     for(int i=0; i<results.size(); i++) {
+       // Verify the values
+       Assert.assertEquals(expected.get(i), results.get(i));
+     }
+   }
+ 
+   /**
+    * Getting MOB data from files
+    * @throws IOException
+    */
+   @Test
+   public void testGetFromFiles() throws IOException {
+     final Configuration conf = TEST_UTIL.getConfiguration();
+     init(name.getMethodName(), conf, false);
+ 
+     //Put data in memstore
+     this.store.add(new KeyValue(row, family, qf1, 1, value));
+     this.store.add(new KeyValue(row, family, qf2, 1, value));
+     //flush
+     flush(1);
+ 
+     //Add more data
+     this.store.add(new KeyValue(row, family, qf3, 1, value));
+     this.store.add(new KeyValue(row, family, qf4, 1, value));
+     //flush
+     flush(2);
+ 
+     //Add more data
+     this.store.add(new KeyValue(row, family, qf5, 1, value));
+     this.store.add(new KeyValue(row, family, qf6, 1, value));
+     //flush
+     flush(3);
+ 
+     Scan scan = new Scan(get);
+     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+         scan.getFamilyMap().get(store.getFamily().getName()),
+         0);
+ 
+     List<Cell> results = new ArrayList<Cell>();
+     scanner.next(results);
+     Collections.sort(results, KeyValue.COMPARATOR);
+     scanner.close();
+ 
+     //Compare
+     Assert.assertEquals(expected.size(), results.size());
+     for(int i=0; i<results.size(); i++) {
+       Assert.assertEquals(expected.get(i), results.get(i));
+     }
+   }
+ 
+   /**
+    * Getting the reference data from files
+    * @throws IOException
+    */
+   @Test
+   public void testGetReferencesFromFiles() throws IOException {
+     final Configuration conf = HBaseConfiguration.create();
+     init(name.getMethodName(), conf, false);
+ 
+     //Put data in memstore
+     this.store.add(new KeyValue(row, family, qf1, 1, value));
+     this.store.add(new KeyValue(row, family, qf2, 1, value));
+     //flush
+     flush(1);
+ 
+     //Add more data
+     this.store.add(new KeyValue(row, family, qf3, 1, value));
+     this.store.add(new KeyValue(row, family, qf4, 1, value));
+     //flush
+     flush(2);
+ 
+     //Add more data
+     this.store.add(new KeyValue(row, family, qf5, 1, value));
+     this.store.add(new KeyValue(row, family, qf6, 1, value));
+     //flush
+     flush(3);
+ 
+     Scan scan = new Scan(get);
+     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+       scan.getFamilyMap().get(store.getFamily().getName()),
+       0);
+ 
+     List<Cell> results = new ArrayList<Cell>();
+     scanner.next(results);
+     Collections.sort(results, KeyValue.COMPARATOR);
+     scanner.close();
+ 
+     //Compare
+     Assert.assertEquals(expected.size(), results.size());
+     for(int i=0; i<results.size(); i++) {
+       Cell cell = results.get(i);
+       Assert.assertTrue(MobUtils.isMobReferenceCell(cell));
+     }
+   }
+ 
+   /**
+    * Getting data from memstore and files
+    * @throws IOException
+    */
+   @Test
+   public void testGetFromMemStoreAndFiles() throws IOException {
+ 
+     final Configuration conf = HBaseConfiguration.create();
+ 
+     init(name.getMethodName(), conf, false);
+ 
+     //Put data in memstore
+     this.store.add(new KeyValue(row, family, qf1, 1, value));
+     this.store.add(new KeyValue(row, family, qf2, 1, value));
+     //flush
+     flush(1);
+ 
+     //Add more data
+     this.store.add(new KeyValue(row, family, qf3, 1, value));
+     this.store.add(new KeyValue(row, family, qf4, 1, value));
+     //flush
+     flush(2);
+ 
+     //Add more data
+     this.store.add(new KeyValue(row, family, qf5, 1, value));
+     this.store.add(new KeyValue(row, family, qf6, 1, value));
+ 
+     Scan scan = new Scan(get);
+     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+         scan.getFamilyMap().get(store.getFamily().getName()),
+         0);
+ 
+     List<Cell> results = new ArrayList<Cell>();
+     scanner.next(results);
+     Collections.sort(results, KeyValue.COMPARATOR);
+     scanner.close();
+ 
+     //Compare
+     Assert.assertEquals(expected.size(), results.size());
+     for(int i=0; i<results.size(); i++) {
+       Assert.assertEquals(expected.get(i), results.get(i));
+     }
+   }
+ 
+   /**
+    * Getting data from memstore and files
+    * @throws IOException
+    */
+   @Test
+   public void testMobCellSizeThreshold() throws IOException {
+ 
+     final Configuration conf = HBaseConfiguration.create();
+ 
+     HColumnDescriptor hcd;
+     hcd = new HColumnDescriptor(family);
+     hcd.setMobEnabled(true);
+     hcd.setMobThreshold(100);
+     hcd.setMaxVersions(4);
+     init(name.getMethodName(), conf, hcd, false);
+ 
+     //Put data in memstore
+     this.store.add(new KeyValue(row, family, qf1, 1, value));
+     this.store.add(new KeyValue(row, family, qf2, 1, value));
+     //flush
+     flush(1);
+ 
+     //Add more data
+     this.store.add(new KeyValue(row, family, qf3, 1, value));
+     this.store.add(new KeyValue(row, family, qf4, 1, value));
+     //flush
+     flush(2);
+ 
+     //Add more data
+     this.store.add(new KeyValue(row, family, qf5, 1, value));
+     this.store.add(new KeyValue(row, family, qf6, 1, value));
+     //flush
+     flush(3);
+ 
+     Scan scan = new Scan(get);
+     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+       scan.getFamilyMap().get(store.getFamily().getName()),
+       0);
+ 
+     List<Cell> results = new ArrayList<Cell>();
+     scanner.next(results);
+     Collections.sort(results, KeyValue.COMPARATOR);
+     scanner.close();
+ 
+     //Compare
+     Assert.assertEquals(expected.size(), results.size());
+     for(int i=0; i<results.size(); i++) {
+       Cell cell = results.get(i);
+       //this is not mob reference cell.
+       Assert.assertFalse(MobUtils.isMobReferenceCell(cell));
+       Assert.assertEquals(expected.get(i), results.get(i));
+       Assert.assertEquals(100, store.getFamily().getMobThreshold());
+     }
+   }
+ 
+   @Test
+   public void testCommitFile() throws Exception {
+     final Configuration conf = HBaseConfiguration.create();
+     init(name.getMethodName(), conf, true);
+     String targetPathName = MobUtils.formatDate(new Date());
+     Path targetPath = new Path(store.getPath(), (targetPathName
+         + Path.SEPARATOR + mobFilePath.getName()));
+     fs.delete(targetPath, true);
+     Assert.assertFalse(fs.exists(targetPath));
+     //commit file
+     store.commitFile(mobFilePath, targetPath);
+     Assert.assertTrue(fs.exists(targetPath));
+   }
+ 
+   @Test
+   public void testResolve() throws Exception {
+     final Configuration conf = HBaseConfiguration.create();
+     init(name.getMethodName(), conf, true);
+     String targetPathName = MobUtils.formatDate(currentDate);
+     Path targetPath = new Path(store.getPath(), targetPathName);
+     store.commitFile(mobFilePath, targetPath);
+     //resolve
+     Cell resultCell1 = store.resolve(seekKey1, false);
+     Cell resultCell2 = store.resolve(seekKey2, false);
+     Cell resultCell3 = store.resolve(seekKey3, false);
+     //compare
+     Assert.assertEquals(Bytes.toString(value),
+         Bytes.toString(CellUtil.cloneValue(resultCell1)));
+     Assert.assertEquals(Bytes.toString(value),
+         Bytes.toString(CellUtil.cloneValue(resultCell2)));
+     Assert.assertEquals(Bytes.toString(value2),
+         Bytes.toString(CellUtil.cloneValue(resultCell3)));
+   }
+ 
+   /**
+    * Flush the memstore
+    * @param storeFilesSize
+    * @throws IOException
+    */
+   private void flush(int storeFilesSize) throws IOException{
+     this.store.snapshot();
+     flushStore(store, id++);
+     Assert.assertEquals(storeFilesSize, this.store.getStorefiles().size());
+     Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
+   }
+ 
+   /**
+    * Flush the memstore
+    * @param store
+    * @param id
+    * @throws IOException
+    */
+   private static void flushStore(HMobStore store, long id) throws IOException {
+     StoreFlushContext storeFlushCtx = store.createFlushContext(id);
+     storeFlushCtx.prepare();
+     storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+   }
+ 
+   @Test
+   public void testMOBStoreEncryption() throws Exception {
+     final Configuration conf = TEST_UTIL.getConfiguration();
+ 
+     conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
+     conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
+     SecureRandom rng = new SecureRandom();
+     byte[] keyBytes = new byte[AES.KEY_LENGTH];
+     rng.nextBytes(keyBytes);
+     String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
+     Key cfKey = new SecretKeySpec(keyBytes, algorithm);
+ 
+     HColumnDescriptor hcd = new HColumnDescriptor(family);
+     hcd.setMobEnabled(true);
+     hcd.setMobThreshold(100);
+     hcd.setMaxVersions(4);
+     hcd.setEncryptionType(algorithm);
+     hcd.setEncryptionKey(EncryptionUtil.wrapKey(conf,
 -      conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), cfKey));
++      conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()),cfKey));
+ 
+     init(name.getMethodName(), conf, hcd, false);
+ 
+     this.store.add(new KeyValue(row, family, qf1, 1, value));
+     this.store.add(new KeyValue(row, family, qf2, 1, value));
+     this.store.add(new KeyValue(row, family, qf3, 1, value));
+     flush(1);
+ 
+     this.store.add(new KeyValue(row, family, qf4, 1, value));
+     this.store.add(new KeyValue(row, family, qf5, 1, value));
+     this.store.add(new KeyValue(row, family, qf6, 1, value));
+     flush(2);
+ 
+     Collection<StoreFile> storefiles = this.store.getStorefiles();
+     checkMobHFileEncrytption(storefiles);
+ 
+     // Scan the values
+     Scan scan = new Scan(get);
+     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+         scan.getFamilyMap().get(store.getFamily().getName()),
+         0);
+ 
+     List<Cell> results = new ArrayList<Cell>();
+     scanner.next(results);
+     Collections.sort(results, KeyValue.COMPARATOR);
+     scanner.close();
+     Assert.assertEquals(expected.size(), results.size());
+     for(int i=0; i<results.size(); i++) {
+       Assert.assertEquals(expected.get(i), results.get(i));
+     }
+ 
+     // Trigger major compaction
+     this.store.triggerMajorCompaction();
+     CompactionContext requestCompaction = this.store.requestCompaction(1, null);
+     this.store.compact(requestCompaction, NoLimitCompactionThroughputController.INSTANCE);
+     Assert.assertEquals(1, this.store.getStorefiles().size());
+ 
+     //Check encryption after compaction
+     checkMobHFileEncrytption(this.store.getStorefiles());
+   }
+ 
+   private void checkMobHFileEncrytption(Collection<StoreFile> storefiles) {
+     StoreFile storeFile = storefiles.iterator().next();
+     HFile.Reader reader = storeFile.getReader().getHFileReader();
+     byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
+     Assert.assertTrue(null != encryptionKey);
+     Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()
+         .equals(HConstants.CIPHER_AES));
+   }
+ 
+ }