You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2020/04/08 19:16:51 UTC

[hbase] branch master updated: HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (#1446)

This is an automated email from the ASF dual-hosted git repository.

busbey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new eb7df04  HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (#1446)
eb7df04 is described below

commit eb7df0498cf4699f1c8c449ddb98a2d470433bf4
Author: Sean Busbey <bu...@apache.org>
AuthorDate: Wed Apr 8 14:16:43 2020 -0500

    HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (#1446)
    
    * Reorganize MOB compaction tests for more reuse.
    * Add tests for mob compaction after snapshot clone operations
    * note the original table used to write a given mob hfile and use that to find it later.
    
    Signed-off-by: Esteban Gutierrez <es...@apache.org>
---
 .../org/apache/hadoop/hbase/PrivateCellUtil.java   |   2 +-
 .../java/org/apache/hadoop/hbase/TableName.java    |  65 +++-
 .../hadoop/hbase/io/hfile/HFilePrettyPrinter.java  |   8 +-
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java | 127 +++++---
 .../hadoop/hbase/mob/DefaultMobStoreFlusher.java   |   6 +-
 .../hadoop/hbase/mob/MobFileCleanerChore.java      |  42 +--
 .../java/org/apache/hadoop/hbase/mob/MobUtils.java | 145 +++++++--
 .../hadoop/hbase/regionserver/HMobStore.java       |  53 ++--
 .../hadoop/hbase/regionserver/HStoreFile.java      |   3 +-
 .../hadoop/hbase/regionserver/StoreFileWriter.java |  17 +-
 .../hadoop/hbase/mob/FaultyMobStoreCompactor.java  |  30 +-
 .../hadoop/hbase/mob/TestMobCompactionBase.java    | 232 --------------
 .../hadoop/hbase/mob/TestMobCompactionOptMode.java |  31 +-
 .../mob/TestMobCompactionOptRegionBatchMode.java   |  30 +-
 .../hbase/mob/TestMobCompactionRegularMode.java    |  75 -----
 .../TestMobCompactionRegularRegionBatchMode.java   |  29 +-
 .../hbase/mob/TestMobCompactionWithDefaults.java   | 335 +++++++++++++++++++++
 17 files changed, 714 insertions(+), 516 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
index f85fbef..2aadc42 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
@@ -915,7 +915,7 @@ public final class PrivateCellUtil {
    * Retrieve Cell's first tag, matching the passed in type
    * @param cell The Cell
    * @param type Type of the Tag to retrieve
-   * @return null if there is no tag of the passed in tag type
+   * @return Optional, empty if there is no tag of the passed in tag type
    */
   public static Optional<Tag> getTag(Cell cell, byte type) {
     boolean bufferBacked = cell instanceof ByteBufferExtendedCell;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
index f096ef9..b659d14 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
@@ -24,9 +24,12 @@ import java.util.Arrays;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
 /**
  * Immutable POJO class for representing a table name.
  * Which is of the form:
@@ -146,9 +149,7 @@ public final class TableName implements Comparable<TableName> {
       throw new IllegalArgumentException("Name is null or empty");
     }
 
-    int namespaceDelimIndex =
-      org.apache.hbase.thirdparty.com.google.common.primitives.Bytes.lastIndexOf(tableName,
-        (byte) NAMESPACE_DELIM);
+    int namespaceDelimIndex = ArrayUtils.lastIndexOf(tableName, (byte) NAMESPACE_DELIM);
     if (namespaceDelimIndex < 0){
       isLegalTableQualifierName(tableName);
     } else {
@@ -433,33 +434,73 @@ public final class TableName implements Comparable<TableName> {
 
 
   /**
+   * @param fullName will use the entire byte array
    * @throws IllegalArgumentException if fullName equals old root or old meta. Some code
    *  depends on this. The test is buried in the table creation to save on array comparison
    *  when we're creating a standard table object that will be in the cache.
    */
   public static TableName valueOf(byte[] fullName) throws IllegalArgumentException{
+    return valueOf(fullName, 0, fullName.length);
+  }
+
+  /**
+   * @param fullName byte array to look into
+   * @param offset within said array
+   * @param length within said array
+   * @throws IllegalArgumentException if fullName equals old root or old meta.
+   */
+  public static TableName valueOf(byte[] fullName, int offset, int length)
+      throws IllegalArgumentException {
+    Preconditions.checkArgument(offset >= 0, "offset must be non-negative but was %s", offset);
+    Preconditions.checkArgument(offset < fullName.length, "offset (%s) must be < array length (%s)",
+        offset, fullName.length);
+    Preconditions.checkArgument(length <= fullName.length,
+        "length (%s) must be <= array length (%s)", length, fullName.length);
     for (TableName tn : tableCache) {
-      if (Arrays.equals(tn.getName(), fullName)) {
+      final byte[] tnName = tn.getName();
+      if (Bytes.equals(tnName, 0, tnName.length, fullName, offset, length)) {
         return tn;
       }
     }
 
-    int namespaceDelimIndex =
-      org.apache.hbase.thirdparty.com.google.common.primitives.Bytes.lastIndexOf(fullName,
-        (byte) NAMESPACE_DELIM);
+    int namespaceDelimIndex = ArrayUtils.lastIndexOf(fullName, (byte) NAMESPACE_DELIM,
+        offset + length - 1);
 
-    if (namespaceDelimIndex < 0) {
+    if (namespaceDelimIndex < offset) {
       return createTableNameIfNecessary(
           ByteBuffer.wrap(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME),
-          ByteBuffer.wrap(fullName));
+          ByteBuffer.wrap(fullName, offset, length));
     } else {
       return createTableNameIfNecessary(
-          ByteBuffer.wrap(fullName, 0, namespaceDelimIndex),
-          ByteBuffer.wrap(fullName, namespaceDelimIndex + 1,
-              fullName.length - (namespaceDelimIndex + 1)));
+          ByteBuffer.wrap(fullName, offset, namespaceDelimIndex),
+          ByteBuffer.wrap(fullName, namespaceDelimIndex + 1, length - (namespaceDelimIndex + 1)));
     }
   }
 
+  /**
+   * @param fullname of a table, possibly with a leading namespace and ':' as delimiter.
+   * @throws IllegalArgumentException if fullName equals old root or old meta.
+   */
+  public static TableName valueOf(ByteBuffer fullname) {
+    fullname = fullname.duplicate();
+    fullname.mark();
+    boolean miss = true;
+    while (fullname.hasRemaining() && miss) {
+      miss = ((byte) NAMESPACE_DELIM) != fullname.get();
+    }
+    if (miss) {
+      fullname.reset();
+      return valueOf(null, fullname);
+    } else {
+      ByteBuffer qualifier = fullname.slice();
+      int delimiterIndex = fullname.position() - 1;
+      fullname.reset();
+      // changing variable name for clarity
+      ByteBuffer namespace = fullname.duplicate();
+      namespace.limit(delimiterIndex);
+      return valueOf(namespace, qualifier);
+    }
+  }
 
   /**
    * @throws IllegalArgumentException if fullName equals old root or old meta. Some code
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
index 88f73f7..a8d1d1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
@@ -43,6 +43,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TimeZone;
@@ -437,17 +438,16 @@ public class HFilePrettyPrinter extends Configured implements Tool {
       }
       // check if mob files are missing.
       if (checkMobIntegrity && MobUtils.isMobReferenceCell(cell)) {
-        Tag tnTag = MobUtils.getTableNameTag(cell);
-        if (tnTag == null) {
+        Optional<TableName> tn = MobUtils.getTableName(cell);
+        if (! tn.isPresent()) {
           System.err.println("ERROR, wrong tag format in mob reference cell "
             + CellUtil.getCellKeyAsString(cell));
         } else if (!MobUtils.hasValidMobRefCellValue(cell)) {
           System.err.println("ERROR, wrong value format in mob reference cell "
             + CellUtil.getCellKeyAsString(cell));
         } else {
-          TableName tn = TableName.valueOf(Tag.cloneValue(tnTag));
           String mobFileName = MobUtils.getMobFileName(cell);
-          boolean exist = mobFileExists(fs, tn, mobFileName,
+          boolean exist = mobFileExists(fs, tn.get(), mobFileName,
             Bytes.toString(CellUtil.cloneFamily(cell)), foundMobFiles, missingMobFiles);
           if (!exist) {
             // report error
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index c5ef6a6..45b9d80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -26,9 +26,9 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
+import java.util.Map.Entry;
+import java.util.Optional;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -37,9 +37,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.HMobStore;
 import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
@@ -60,7 +62,10 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
 
 /**
  * Compact passed set of files in the mob-enabled column family.
@@ -79,13 +84,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
    * content of it is written into meta section of a newly created store file at the final step of
    * compaction process.
    */
-
-  static ThreadLocal<Set<String>> mobRefSet = new ThreadLocal<Set<String>>() {
-    @Override
-    protected Set<String> initialValue() {
-      return new HashSet<String>();
-    }
-  };
+  static ThreadLocal<SetMultimap<TableName,String>> mobRefSet =
+      ThreadLocal.withInitial(HashMultimap::create);
 
   /*
    * Is it user or system-originated request.
@@ -190,34 +190,71 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
     // Check if I/O optimized MOB compaction
     if (ioOptimizedMode) {
       if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) {
-        Path mobDir =
-            MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
-        List<Path> mobFiles = MobUtils.getReferencedMobFiles(request.getFiles(), mobDir);
-        //reset disableIO
-        disableIO.set(Boolean.FALSE);
-        if (mobFiles.size() > 0) {
-          calculateMobLengthMap(mobFiles);
+        try {
+          final SetMultimap<TableName, String> mobRefs = request.getFiles().stream()
+              .map(file -> {
+                byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+                ImmutableSetMultimap.Builder<TableName, String> builder;
+                if (value == null) {
+                  builder = ImmutableSetMultimap.builder();
+                } else {
+                  try {
+                    builder = MobUtils.deserializeMobFileRefs(value);
+                  } catch (RuntimeException exception) {
+                    throw new RuntimeException("failure getting mob references for hfile " + file,
+                        exception);
+                  }
+                }
+                return builder;
+              }).reduce((a, b) -> a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder)
+              .build();
+          //reset disableIO
+          disableIO.set(Boolean.FALSE);
+          if (!mobRefs.isEmpty()) {
+            calculateMobLengthMap(mobRefs);
+          }
+          LOG.info("Table={} cf={} region={}. I/O optimized MOB compaction. "+
+              "Total referenced MOB files: {}", tableName, familyName, regionName, mobRefs.size());
+        } catch (RuntimeException exception) {
+          throw new IOException("Failed to get list of referenced hfiles for request " + request,
+              exception);
         }
-        LOG.info("Table={} cf={} region={}. I/O optimized MOB compaction. "+
-            "Total referenced MOB files: {}", tableName, familyName, regionName, mobFiles.size());
       }
     }
 
     return compact(request, scannerFactory, writerFactory, throughputController, user);
   }
 
-  private void calculateMobLengthMap(List<Path> mobFiles) throws IOException {
+  /**
+   * @param mobRefs multimap of original table name -> mob hfile
+   */
+  private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException {
     FileSystem fs = store.getFileSystem();
     HashMap<String, Long> map = mobLengthMap.get();
     map.clear();
-    for (Path p : mobFiles) {
-      if (MobFileName.isOldMobFileName(p.getName())) {
+    for (Entry<TableName, String> reference : mobRefs.entries()) {
+      final TableName table = reference.getKey();
+      final String mobfile = reference.getValue();
+      if (MobFileName.isOldMobFileName(mobfile)) {
         disableIO.set(Boolean.TRUE);
       }
-      FileStatus st = fs.getFileStatus(p);
-      long size = st.getLen();
-      LOG.debug("Referenced MOB file={} size={}", p, size);
-      map.put(p.getName(), fs.getFileStatus(p).getLen());
+      List<Path> locations = mobStore.getLocations(table);
+      for (Path p : locations) {
+        try {
+          FileStatus st = fs.getFileStatus(new Path(p, mobfile));
+          long size = st.getLen();
+          LOG.debug("Referenced MOB file={} size={}", mobfile, size);
+          map.put(mobfile, size);
+          break;
+        } catch (FileNotFoundException exception) {
+          LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile,
+              p);
+        }
+      }
+      if (!map.containsKey(mobfile)) {
+        throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of " +
+            "expected locations: " + locations);
+      }
     }
   }
 
@@ -391,8 +428,15 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
                     // We leave large MOB file as is (is not compacted),
                     // then we update set of MOB file references
                     // and append mob cell directly to the store's writer
-                    mobRefSet.get().add(fName);
-                    writer.append(mobCell);
+                    Optional<TableName> refTable = MobUtils.getTableName(c);
+                    if (refTable.isPresent()) {
+                      mobRefSet.get().put(refTable.get(), fName);
+                      writer.append(c);
+                    } else {
+                      throw new IOException(String.format("MOB cell did not contain a tablename " +
+                          "tag. should not be possible. see ref guide on mob troubleshooting. " +
+                          "store={} cell={}", getStoreInfo(), c));
+                    }
                   }
                 }
               } else {
@@ -440,9 +484,15 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
             if (MobUtils.hasValidMobRefCellValue(c)) {
               // We do not check mobSizeThreshold during normal compaction,
               // leaving it to a MOB compaction run
-              writer.append(c);
-              // Add MOB reference to a MOB reference set
-              mobRefSet.get().add(MobUtils.getMobFileName(c));
+              Optional<TableName> refTable = MobUtils.getTableName(c);
+              if (refTable.isPresent()) {
+                mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
+                writer.append(c);
+              } else {
+                throw new IOException(String.format("MOB cell did not contain a tablename " +
+                    "tag. should not be possible. see ref guide on mob troubleshooting. " +
+                    "store={} cell={}", getStoreInfo(), c));
+              }
             } else {
               String errMsg = String.format("Corrupted MOB reference: %s", c.toString());
               throw new IOException(errMsg);
@@ -529,7 +579,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
       throughputController.finish(compactionName);
       if (!finished && mobFileWriter != null) {
         // Remove all MOB references because compaction failed
-        mobRefSet.get().clear();
+        clearThreadLocals();
         // Abort writer
         LOG.debug("Aborting writer for {} because of a compaction failure, Store {}",
           mobFileWriter.getPath(), getStoreInfo());
@@ -547,16 +597,13 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
     return true;
   }
 
-  private String getStoreInfo() {
+  protected String getStoreInfo() {
     return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(),
       store.getColumnFamilyName(), store.getRegionInfo().getEncodedName()) ;
   }
 
   private void clearThreadLocals() {
-    Set<String> set = mobRefSet.get();
-    if (set != null) {
-      set.clear();
-    }
+    mobRefSet.get().clear();
     HashMap<String, Long> map = mobLengthMap.get();
     if (map != null) {
       map.clear();
@@ -571,7 +618,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
       LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(),
         getStoreInfo());
       // Add reference we get for compact MOB
-      mobRefSet.get().add(mobFileWriter.getPath().getName());
+      mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
       return mobFileWriter;
     } catch (IOException e) {
       // Bailing out
@@ -604,7 +651,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
         LOG.debug("Aborting writer for {} because there are no MOB cells, store={}",
           mobFileWriter.getPath(), getStoreInfo());
         // Remove MOB file from reference set
-        mobRefSet.get().remove(mobFileWriter.getPath().getName());
+        mobRefSet.get().remove(store.getTableName(), mobFileWriter.getPath().getName());
         abortWriter(mobFileWriter);
       }
     } else {
@@ -619,9 +666,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
       CompactionRequestImpl request) throws IOException {
     List<Path> newFiles = Lists.newArrayList(writer.getPath());
     writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
-    // Append MOB references
-    Set<String> refSet = mobRefSet.get();
-    writer.appendMobMetadata(refSet);
+    writer.appendMobMetadata(mobRefSet.get());
     writer.close();
     clearThreadLocals();
     return newFiles;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index 5c4c602..c1d5736 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
 import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
@@ -48,6 +49,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
+
 /**
  * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
  * If the store is not a mob store, the flusher flushes the MemStore the same with
@@ -280,7 +283,8 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
     // The hfile is current up to and including cacheFlushSeqNum.
     status.setStatus("Flushing " + store + ": appending metadata");
     writer.appendMetadata(cacheFlushSeqNum, false);
-    writer.appendMobMetadata(mobRefSet.get());
+    writer.appendMobMetadata(ImmutableSetMultimap.<TableName, String>builder()
+        .putAll(store.getTableName(), mobRefSet.get()).build());
     status.setStatus("Flushing " + store + ": closing flushed file");
     writer.close();
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
index 7f7d90c..8ea25b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.mob;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -54,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
 
 /**
  * The class MobFileCleanerChore for running cleaner regularly to remove the expired
@@ -212,28 +212,28 @@ public class MobFileCleanerChore extends ScheduledChore {
                 byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
                 // close store file to avoid memory leaks
                 sf.closeStoreFile(true);
-                if (mobRefData == null && bulkloadMarkerData == null) {
-                  LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
-                      + "can not proceed until all old files will be MOB-compacted.",
-                    pp);
-                  return;
-                } else if (mobRefData == null && bulkloadMarkerData != null) {
-                  LOG.debug("Skipping file without MOB references (bulkloaded file):{}", pp);
-                  continue;
-                }
-                // mobRefData will never be null here, but to make FindBugs happy
-                if (mobRefData != null && mobRefData.length > 1) {
-                  // if length = 1 means NULL, that there are no MOB references
-                  // in this store file, but the file was created by new MOB code
-                  String[] mobs = new String(mobRefData).split(",");
-                  if (LOG.isTraceEnabled()) {
-                    LOG.trace("Found: {} mob references: {}", mobs.length, Arrays.toString(mobs));
+                if (mobRefData == null) {
+                  if (bulkloadMarkerData == null) {
+                    LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
+                        + "can not proceed until all old files will be MOB-compacted.",
+                      pp);
+                    return;
                   } else {
-                    LOG.debug("Found: {} mob references", mobs.length);
+                    LOG.debug("Skipping file without MOB references (bulkloaded file):{}", pp);
+                    continue;
                   }
-                  regionMobs.addAll(Arrays.asList(mobs));
-                } else {
-                  LOG.debug("File {} does not have mob references", currentPath);
+                }
+                // file may or may not have MOB references, but was created by the distributed
+                // mob compaction code.
+                try {
+                  SetMultimap<TableName, String> mobs = MobUtils.deserializeMobFileRefs(mobRefData)
+                      .build();
+                  LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
+                  LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
+                  regionMobs.addAll(mobs.values());
+                } catch (RuntimeException exception) {
+                  throw new IOException("failure getting mob references for hfile " + sf,
+                      exception);
                 }
               }
             } catch (FileNotFoundException e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index 3a6a55d..8ce63fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -20,17 +20,16 @@ package org.apache.hadoop.hbase.mob;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
@@ -68,6 +67,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
 
 /**
  * The mob utilities
@@ -130,14 +131,51 @@ public final class MobUtils {
    * @param cell The current cell.
    * @return The table name tag.
    */
-  public static Tag getTableNameTag(Cell cell) {
+  private static Optional<Tag> getTableNameTag(Cell cell) {
+    Optional<Tag> tag = Optional.empty();
     if (cell.getTagsLength() > 0) {
-      Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE);
-      if (tag.isPresent()) {
-        return tag.get();
+      tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE);
+    }
+    return tag;
+  }
+
+  /**
+   * Gets the table name from when this cell was written into a mob hfile as a string.
+   * @param cell to extract tag from
+   * @return table name as a string. empty if the tag is not found.
+   */
+  public static Optional<String> getTableNameString(Cell cell) {
+    Optional<Tag> tag = getTableNameTag(cell);
+    Optional<String> name = Optional.empty();
+    if (tag.isPresent()) {
+      name = Optional.of(Tag.getValueAsString(tag.get()));
+    }
+    return name;
+  }
+
+  /**
+   * Get the table name from when this cell was written into a mob hfile as a TableName.
+   * @param cell to extract tag from
+   * @return name of table as a TableName. empty if the tag is not found.
+   */
+  public static Optional<TableName> getTableName(Cell cell) {
+    Optional<Tag> maybe = getTableNameTag(cell);
+    Optional<TableName> name = Optional.empty();
+    if (maybe.isPresent()) {
+      final Tag tag = maybe.get();
+      if (tag.hasArray()) {
+        name = Optional.of(TableName.valueOf(tag.getValueArray(), tag.getValueOffset(),
+            tag.getValueLength()));
+      } else {
+        // TODO ByteBuffer handling in tags looks busted. revisit.
+        ByteBuffer buffer = tag.getValueByteBuffer().duplicate();
+        buffer.mark();
+        buffer.position(tag.getValueOffset());
+        buffer.limit(tag.getValueOffset() + tag.getValueLength());
+        name = Optional.of(TableName.valueOf(buffer));
       }
     }
-    return null;
+    return name;
   }
 
   /**
@@ -383,7 +421,7 @@ public final class MobUtils {
 
   /**
    * Gets the RegionInfo of the mob files. This is a dummy region. The mob files are not saved in a
-   * region in HBase. This is only used in mob snapshot. It's internally used only.
+   * region in HBase. It's internally used only.
    * @param tableName
    * @return A dummy mob region info.
    */
@@ -665,27 +703,78 @@ public final class MobUtils {
   }
 
   /**
-   * Get list of referenced MOB files from a given collection of store files
-   * @param storeFiles store files
-   * @param mobDir MOB file directory
-   * @return list of MOB file paths
-   */
-
-  public static List<Path> getReferencedMobFiles(Collection<HStoreFile> storeFiles, Path mobDir) {
-
-    Set<String> mobSet = new HashSet<String>();
-    for (HStoreFile sf : storeFiles) {
-      byte[] value = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
-      if (value != null && value.length > 1) {
-        String s = Bytes.toString(value);
-        String[] all = s.split(",");
-        Collections.addAll(mobSet, all);
+   * Serialize a set of referenced mob hfiles
+   * @param mobRefSet to serialize, may be null
+   * @return byte array to i.e. put into store file metadata. will not be null
+   */
+  public static byte[] serializeMobFileRefs(SetMultimap<TableName, String> mobRefSet) {
+    if (mobRefSet != null && mobRefSet.size() > 0) {
+      // Here we rely on the fact that '/' and ',' are not allowed in either table names nor hfile
+      // names for serialization.
+      //
+      // exampleTable/filename1,filename2//example:table/filename5//otherTable/filename3,filename4
+      //
+      // to approximate the needed capacity we use the fact that there will usually be 1 table name
+      // and each mob filename is around 105 bytes. we pick an arbitrary number to cover "most"
+      // single table name lengths
+      StringBuilder sb = new StringBuilder(100 + mobRefSet.size() * 105);
+      boolean doubleSlash = false;
+      for (TableName tableName : mobRefSet.keySet()) {
+        sb.append(tableName).append("/");
+        boolean comma = false;
+        for (String refs : mobRefSet.get(tableName)) {
+          sb.append(refs);
+          if (comma) {
+            sb.append(",");
+          } else {
+            comma = true;
+          }
+        }
+        if (doubleSlash) {
+          sb.append("//");
+        } else {
+          doubleSlash = true;
+        }
       }
+      return Bytes.toBytes(sb.toString());
+    } else {
+      return HStoreFile.NULL_VALUE;
     }
-    List<Path> retList = new ArrayList<Path>();
-    for (String name : mobSet) {
-      retList.add(new Path(mobDir, name));
+  }
+
+  /**
+   * Deserialize the set of referenced mob hfiles from store file metadata.
+   * @param bytes compatibly serialized data. can not be null
+   * @return a setmultimap of original table to list of hfile names. will be empty if no values.
+   * @throws IllegalStateException if there are values but no table name
+   */
+  public static ImmutableSetMultimap.Builder<TableName, String> deserializeMobFileRefs(byte[] bytes)
+      throws IllegalStateException {
+    ImmutableSetMultimap.Builder<TableName, String> map = ImmutableSetMultimap.builder();
+    if (bytes.length > 1) {
+      // TODO avoid turning the tablename pieces in to strings.
+      String s = Bytes.toString(bytes);
+      String[] tables = s.split("//");
+      for (String tableEnc : tables) {
+        final int delim = tableEnc.indexOf('/');
+        if (delim <= 0) {
+          throw new IllegalStateException("MOB reference data does not match expected encoding: " +
+              "no table name included before list of mob refs.");
+        }
+        TableName table = TableName.valueOf(tableEnc.substring(0, delim));
+        String[] refs = tableEnc.substring(delim + 1).split(",");
+        map.putAll(table, refs);
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        // array length 1 should be the NULL_VALUE.
+        if (! Arrays.equals(HStoreFile.NULL_VALUE, bytes)) {
+          LOG.debug("Serialized MOB file refs array was treated as the placeholder 'no entries' but"
+              + " didn't have the expected placeholder byte. expected={} and actual={}",
+              Arrays.toString(HStoreFile.NULL_VALUE), Arrays.toString(bytes));
+        }
+      }
     }
-    return retList;
+    return map;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index d7f2ba3..5960b80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -55,7 +56,6 @@ import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobStoreEngine;
 import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.IdLock;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -94,7 +94,7 @@ public class HMobStore extends HStore {
   private AtomicLong mobScanCellsCount = new AtomicLong();
   private AtomicLong mobScanCellsSize = new AtomicLong();
   private ColumnFamilyDescriptor family;
-  private Map<String, List<Path>> map = new ConcurrentHashMap<>();
+  private Map<TableName, List<Path>> map = new ConcurrentHashMap<>();
   private final IdLock keyLock = new IdLock();
   // When we add a MOB reference cell to the HFile, we will add 2 tags along with it
   // 1. A ref tag with type TagType.MOB_REFERENCE_TAG_TYPE. This just denote this this cell is not
@@ -117,7 +117,7 @@ public class HMobStore extends HStore {
     TableName tn = region.getTableDescriptor().getTableName();
     locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn)
         .getEncodedName(), family.getNameAsString()));
-    map.put(Bytes.toString(tn.getName()), locations);
+    map.put(tn, locations);
     List<Tag> tags = new ArrayList<>(2);
     tags.add(MobConstants.MOB_REF_TAG);
     Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE,
@@ -315,26 +315,9 @@ public class HMobStore extends HStore {
     MobCell mobCell = null;
     if (MobUtils.hasValidMobRefCellValue(reference)) {
       String fileName = MobUtils.getMobFileName(reference);
-      Tag tableNameTag = MobUtils.getTableNameTag(reference);
-      if (tableNameTag != null) {
-        String tableNameString = Tag.getValueAsString(tableNameTag);
-        List<Path> locations = map.get(tableNameString);
-        if (locations == null) {
-          IdLock.Entry lockEntry = keyLock.getLockEntry(tableNameString.hashCode());
-          try {
-            locations = map.get(tableNameString);
-            if (locations == null) {
-              locations = new ArrayList<>(2);
-              TableName tn = TableName.valueOf(tableNameString);
-              locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
-              locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
-                MobUtils.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
-              map.put(tableNameString, locations);
-            }
-          } finally {
-            keyLock.releaseLockEntry(lockEntry);
-          }
-        }
+      Optional<TableName> tableName = MobUtils.getTableName(reference);
+      if (tableName.isPresent()) {
+        List<Path> locations = getLocations(tableName.get());
         mobCell = readCell(locations, fileName, reference, cacheBlocks, readPt,
           readEmptyValueOnMobCellMiss);
       }
@@ -358,6 +341,30 @@ public class HMobStore extends HStore {
   }
 
   /**
+   * @param tableName to look up locations for, can not be null
+   * @return a list of location in order of working dir, archive dir. will not be null.
+   */
+  public List<Path> getLocations(TableName tableName) throws IOException {
+    List<Path> locations = map.get(tableName);
+    if (locations == null) {
+      IdLock.Entry lockEntry = keyLock.getLockEntry(tableName.hashCode());
+      try {
+        locations = map.get(tableName);
+        if (locations == null) {
+          locations = new ArrayList<>(2);
+          locations.add(MobUtils.getMobFamilyPath(conf, tableName, family.getNameAsString()));
+          locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tableName,
+            MobUtils.getMobRegionInfo(tableName).getEncodedName(), family.getNameAsString()));
+          map.put(tableName, locations);
+        }
+      } finally {
+        keyLock.releaseLockEntry(lockEntry);
+      }
+    }
+    return locations;
+  }
+
+  /**
    * Reads the cell from a mob file.
    * The mob file might be located in different directories.
    * 1. The working directory.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index 0f227be..0de0295 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -297,8 +297,7 @@ public class HStoreFile implements StoreFile {
   }
 
   /**
-   * Only used by the Striped Compaction Policy
-   * @param key
+   * @param key to look up
    * @return value associated with the metadata key
    */
   public byte[] getMetadataValue(byte[] key) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index 3de97e8..10aa267 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -26,7 +26,6 @@ import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_K
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
-import static org.apache.hadoop.hbase.regionserver.HStoreFile.NULL_VALUE;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -39,7 +38,6 @@ import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -47,11 +45,13 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.util.BloomContext;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterUtil;
@@ -65,6 +65,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 /**
@@ -248,17 +249,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
 
   /**
    * Appends MOB - specific metadata (even if it is empty)
-   * @param mobRefSet - set of MOB file names
+   * @param mobRefSet - original table -> set of MOB file names
    * @throws IOException problem writing to FS
    */
-  public void appendMobMetadata(Set<String> mobRefSet) throws IOException {
-    if (mobRefSet != null && mobRefSet.size() > 0) {
-      String sb = StringUtils.join(mobRefSet, ",");
-      byte[] bytes = Bytes.toBytes(sb.toString());
-      writer.appendFileInfo(MOB_FILE_REFS, bytes);
-    } else {
-      writer.appendFileInfo(MOB_FILE_REFS, NULL_VALUE);
-    }
+  public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
+    writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet));
   }
 
   /**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
index 02ace93..0f8852f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
@@ -24,6 +24,7 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
 import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -164,7 +166,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
         // Add the only reference we get for compact MOB case
         // because new store file will have only one MOB reference
         // in this case - of newly compacted MOB file
-        mobRefSet.get().add(mobFileWriter.getPath().getName());
+        mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
       }
       do {
         hasMore = scanner.next(cells, scannerContext);
@@ -237,9 +239,15 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
               if (size > mobSizeThreshold) {
                 // If the value size is larger than the threshold, it's regarded as a mob. Since
                 // its value is already in the mob file, directly write this cell to the store file
-                writer.append(c);
-                // Add MOB reference to a set
-                mobRefSet.get().add(MobUtils.getMobFileName(c));
+                Optional<TableName> refTable = MobUtils.getTableName(c);
+                if (refTable.isPresent()) {
+                  mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
+                  writer.append(c);
+                } else {
+                  throw new IOException(String.format("MOB cell did not contain a tablename " +
+                      "tag. should not be possible. see ref guide on mob troubleshooting. " +
+                      "store={} cell={}", getStoreInfo(), c));
+                }
               } else {
                 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
                 // the mob cell from the mob file, and write it back to the store file.
@@ -255,9 +263,15 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
                   // directly write the cell to the store file, and leave it to be handled by the
                   // next compaction.
                   LOG.error("Empty value for: " + c);
-                  writer.append(c);
-                  // Add MOB reference to a set
-                  mobRefSet.get().add(MobUtils.getMobFileName(c));
+                  Optional<TableName> refTable = MobUtils.getTableName(c);
+                  if (refTable.isPresent()) {
+                    mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
+                    writer.append(c);
+                  } else {
+                    throw new IOException(String.format("MOB cell did not contain a tablename " +
+                        "tag. should not be possible. see ref guide on mob troubleshooting. " +
+                        "store={} cell={}", getStoreInfo(), c));
+                  }
                 }
               }
             } else {
@@ -280,7 +294,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
             cellsCountCompactedToMob++;
             cellsSizeCompactedToMob += c.getValueLength();
             // Add ref we get for compact MOB case
-            mobRefSet.get().add(mobFileWriter.getPath().getName());
+            mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
           }
 
           int len = c.getSerializedSize();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionBase.java
deleted file mode 100644
index b530df3..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionBase.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mob;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.CompactionState;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.RegionSplitter;
-import org.junit.After;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
-  * Mob file compaction base test.
-  * 1. Enables batch mode for regular MOB compaction,
-  *    Sets batch size to 7 regions. (Optional)
-  * 2. Disables periodic MOB compactions, sets minimum age to archive to 10 sec
-  * 3. Creates MOB table with 20 regions
-  * 4. Loads MOB data (randomized keys, 1000 rows), flushes data.
-  * 5. Repeats 4. two more times
-  * 6. Verifies that we have 20 *3 = 60 mob files (equals to number of regions x 3)
-  * 7. Runs major MOB compaction.
-  * 8. Verifies that number of MOB files in a mob directory is 20 x4 = 80
-  * 9. Waits for a period of time larger than minimum age to archive
-  * 10. Runs Mob cleaner chore
-  * 11 Verifies that number of MOB files in a mob directory is 20.
-  * 12 Runs scanner and checks all 3 * 1000 rows.
- */
-@SuppressWarnings("deprecation")
-public abstract class TestMobCompactionBase {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestMobCompactionBase.class);
-
-  protected HBaseTestingUtility HTU;
-
-  protected final static String famStr = "f1";
-  protected final static byte[] fam = Bytes.toBytes(famStr);
-  protected final static byte[] qualifier = Bytes.toBytes("q1");
-  protected final static long mobLen = 10;
-  protected final static byte[] mobVal = Bytes
-      .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
-
-  protected Configuration conf;
-  protected TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
-  private ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor;
-  protected Admin admin;
-  protected Table table = null;
-  protected long minAgeToArchive = 10000;
-  protected int numRegions = 20;
-  protected int rows = 1000;
-
-  protected MobFileCleanerChore cleanerChore;
-
-  public TestMobCompactionBase() {
-  }
-
-
-  @Before
-  public void setUp() throws Exception {
-    HTU = new HBaseTestingUtility();
-    tableDescriptor = HTU.createModifyableTableDescriptor(getClass().getName());
-    conf = HTU.getConfiguration();
-
-    initConf();
-
-    HTU.startMiniCluster();
-    admin = HTU.getAdmin();
-    cleanerChore = new MobFileCleanerChore();
-    familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
-    familyDescriptor.setMobEnabled(true);
-    familyDescriptor.setMobThreshold(mobLen);
-    familyDescriptor.setMaxVersions(1);
-    tableDescriptor.setColumnFamily(familyDescriptor);
-    RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
-    byte[][] splitKeys = splitAlgo.split(numRegions);
-    table = HTU.createTable(tableDescriptor, splitKeys);
-
-  }
-
-  protected void initConf() {
-
-    conf.setInt("hfile.format.version", 3);
-    // Disable automatic MOB compaction
-    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
-    // Disable automatic MOB file cleaner chore
-    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
-    // Set minimum age to archive to 10 sec
-    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
-    // Set compacted file discharger interval to a half minAgeToArchive
-    conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive/2);
-  }
-
-  private void loadData(int num) {
-
-    Random r = new Random();
-    try {
-      LOG.info("Started loading {} rows", num);
-      for (int i = 0; i < num; i++) {
-        byte[] key = new byte[32];
-        r.nextBytes(key);
-        Put p = new Put(key);
-        p.addColumn(fam, qualifier, mobVal);
-        table.put(p);
-      }
-      admin.flush(table.getName());
-      LOG.info("Finished loading {} rows", num);
-    } catch (Exception e) {
-      LOG.error("MOB file compaction chore test FAILED", e);
-      fail("MOB file compaction chore test FAILED");
-    }
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    admin.disableTable(tableDescriptor.getTableName());
-    admin.deleteTable(tableDescriptor.getTableName());
-    HTU.shutdownMiniCluster();
-  }
-
-
-  public void baseTestMobFileCompaction() throws InterruptedException, IOException {
-
-    // Load and flush data 3 times
-    loadData(rows);
-    loadData(rows);
-    loadData(rows);
-    long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(numRegions * 3, num);
-    // Major MOB compact
-    mobCompact(admin, tableDescriptor, familyDescriptor);
-    // wait until compaction is complete
-    while (admin.getCompactionState(tableDescriptor.getTableName()) != CompactionState.NONE) {
-      Thread.sleep(100);
-    }
-
-    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(numRegions * 4, num);
-    // We have guarantee, that compacted file discharger will run during this pause
-    // because it has interval less than this wait time
-    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
-
-    Thread.sleep(minAgeToArchive + 1000);
-    LOG.info("Cleaning up MOB files");
-    // Cleanup again
-    cleanerChore.cleanupObsoleteMobFiles(conf, table.getName());
-
-    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(numRegions, num);
-
-    long scanned = scanTable();
-    assertEquals(3 * rows, scanned);
-
-  }
-
-  protected abstract void mobCompact(Admin admin2, TableDescriptor tableDescriptor,
-      ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException;
-
-
-  protected  long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
-      throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
-    FileStatus[] stat = fs.listStatus(dir);
-    for (FileStatus st : stat) {
-      LOG.debug("MOB Directory content: {}", st.getPath());
-    }
-    LOG.debug("MOB Directory content total files: {}", stat.length);
-
-    return stat.length;
-  }
-
-
-  protected long scanTable() {
-    try {
-
-      Result result;
-      ResultScanner scanner = table.getScanner(fam);
-      long counter = 0;
-      while ((result = scanner.next()) != null) {
-        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
-        counter++;
-      }
-      return counter;
-    } catch (Exception e) {
-      LOG.error("MOB file compaction test FAILED", e);
-      if (HTU != null) {
-        fail(e.getMessage());
-      } else {
-        System.exit(-1);
-      }
-    }
-    return 0;
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
index 414ca3e..09f7f58 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
@@ -20,12 +20,9 @@ package org.apache.hadoop.hbase.mob;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,37 +44,25 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings("deprecation")
 @Category(LargeTests.class)
-public class TestMobCompactionOptMode extends TestMobCompactionBase{
+public class TestMobCompactionOptMode extends TestMobCompactionWithDefaults {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestMobCompactionOptMode.class);
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
 
-  public TestMobCompactionOptMode() {
-  }
-
-  @Override
-  protected void initConf() {
-    super.initConf();
+  @BeforeClass
+  public static void configureOptimizedCompaction() throws InterruptedException, IOException {
+    HTU.shutdownMiniHBaseCluster();
     conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY,
       MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
     conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
-  }
-
-  @Test
-  public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
-    LOG.info("MOB compaction generational (non-batch) mode started");
-    baseTestMobFileCompaction();
-    LOG.info("MOB compaction generational (non-batch) mode finished OK");
-
+    HTU.startMiniHBaseCluster();
   }
 
   @Override
-  protected void mobCompact(Admin admin, TableDescriptor tableDescriptor,
-      ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
-    // Major compact MOB table
-    admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
+  protected String description() {
+    return "generational (non-batch) mode";
   }
 
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
index 45fecc1..117b9ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
@@ -20,13 +20,12 @@ package org.apache.hadoop.hbase.mob;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,46 +48,43 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings("deprecation")
 @Category(LargeTests.class)
-public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionBase{
+public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDefaults {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestMobCompactionOptRegionBatchMode.class);
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestMobCompactionOptRegionBatchMode.class);
 
-  private int batchSize = 7;
+  private static final int batchSize = 7;
   private MobFileCompactionChore compactionChore;
 
-  public TestMobCompactionOptRegionBatchMode() {
-  }
-
   @Before
   public void setUp() throws Exception {
     super.setUp();
     compactionChore = new MobFileCompactionChore(conf, batchSize);
   }
 
-  protected void initConf() {
-    super.initConf();
+  @BeforeClass
+  public static void configureOptimizedCompactionAndBatches()
+      throws InterruptedException, IOException {
+    HTU.shutdownMiniHBaseCluster();
     conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
     conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY,
       MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
     conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
+    HTU.startMiniHBaseCluster();
   }
 
   @Override
-  protected void mobCompact(Admin admin, TableDescriptor tableDescriptor,
+  protected void mobCompactImpl(TableDescriptor tableDescriptor,
       ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
-    // Major compact with batch mode enabled
+    LOG.debug("compacting {} in batch mode.", tableDescriptor.getTableName());
     compactionChore.performMajorCompactionInBatches(admin, tableDescriptor, familyDescriptor);
   }
 
-  @Test
-  public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
-    LOG.info("MOB compaction chore generational batch mode started");
-    baseTestMobFileCompaction();
-    LOG.info("MOB compaction chore generational batch mode finished OK");
-
+  @Override
+  protected String description() {
+    return "generational batch mode";
   }
 
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularMode.java
deleted file mode 100644
index 01b6804..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularMode.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mob;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
-  * Mob file compaction chore in a regular non-batch mode test.
-  * 1. Uses default (non-batch) mode for regular MOB compaction,
-  * 2. Disables periodic MOB compactions, sets minimum age to archive to 10 sec
-  * 3. Creates MOB table with 20 regions
-  * 4. Loads MOB data (randomized keys, 1000 rows), flushes data.
-  * 5. Repeats 4. two more times
-  * 6. Verifies that we have 20 *3 = 60 mob files (equals to number of regions x 3)
-  * 7. Runs major MOB compaction.
-  * 8. Verifies that number of MOB files in a mob directory is 20 x4 = 80
-  * 9. Waits for a period of time larger than minimum age to archive
-  * 10. Runs Mob cleaner chore
-  * 11 Verifies that number of MOB files in a mob directory is 20.
-  * 12 Runs scanner and checks all 3 * 1000 rows.
- */
-@SuppressWarnings("deprecation")
-@Category(LargeTests.class)
-public class TestMobCompactionRegularMode extends TestMobCompactionBase{
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestMobCompactionRegularMode.class);
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMobCompactionRegularMode.class);
-
-  public TestMobCompactionRegularMode() {
-  }
-
-  @Override
-  protected void mobCompact(Admin admin, TableDescriptor tableDescriptor,
-      ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
-    // Major compact MOB table
-    admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
-  }
-
-  @Test
-  public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
-    LOG.info("MOB compaction regular mode started");
-    baseTestMobFileCompaction();
-    LOG.info("MOB compaction regular mode finished OK");
-
-  }
-
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
index 819f095..5151789 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
@@ -20,13 +20,12 @@ package org.apache.hadoop.hbase.mob;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,43 +48,39 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings("deprecation")
 @Category(LargeTests.class)
-public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionBase{
+public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWithDefaults {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestMobCompactionRegularRegionBatchMode.class);
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestMobCompactionRegularRegionBatchMode.class);
 
-  private int batchSize = 7;
+  private static final int batchSize = 7;
   private MobFileCompactionChore compactionChore;
 
-  public TestMobCompactionRegularRegionBatchMode() {
-  }
-
   @Before
   public void setUp() throws Exception {
     super.setUp();
     compactionChore = new MobFileCompactionChore(conf, batchSize);
   }
 
-  protected void initConf() {
-    super.initConf();
+  @BeforeClass
+  public static void configureCompactionBatches() throws InterruptedException, IOException {
+    HTU.shutdownMiniHBaseCluster();
     conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
+    HTU.startMiniHBaseCluster();
   }
 
   @Override
-  protected void mobCompact(Admin admin, TableDescriptor tableDescriptor,
+  protected void mobCompactImpl(TableDescriptor tableDescriptor,
       ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
-    // Major compact with batch mode enabled
+    LOG.debug("compacting {} in batch mode.", tableDescriptor.getTableName());
     compactionChore.performMajorCompactionInBatches(admin, tableDescriptor, familyDescriptor);
   }
 
-  @Test
-  public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
-    LOG.info("MOB compaction chore regular batch mode started");
-    baseTestMobFileCompaction();
-    LOG.info("MOB compaction chore regular batch mode finished OK");
-
+  @Override
+  protected String description() {
+    return "regular batch mode";
   }
 
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
new file mode 100644
index 0000000..22fb31f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
@@ -0,0 +1,335 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+  * Mob file compaction base test.
+  * 1. Enables batch mode for regular MOB compaction,
+  *    Sets batch size to 7 regions. (Optional)
+  * 2. Disables periodic MOB compactions, sets minimum age to archive to 10 sec
+  * 3. Creates MOB table with 20 regions
+  * 4. Loads MOB data (randomized keys, 1000 rows), flushes data.
+  * 5. Repeats 4. two more times
+  * 6. Verifies that we have 20 *3 = 60 mob files (equals to number of regions x 3)
+  * 7. Runs major MOB compaction.
+  * 8. Verifies that number of MOB files in a mob directory is 20 x4 = 80
+  * 9. Waits for a period of time larger than minimum age to archive
+  * 10. Runs Mob cleaner chore
+  * 11 Verifies that number of MOB files in a mob directory is 20.
+  * 12 Runs scanner and checks all 3 * 1000 rows.
+ */
+@SuppressWarnings("deprecation")
+@Category(LargeTests.class)
+public class TestMobCompactionWithDefaults {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
+
+  protected static HBaseTestingUtility HTU;
+  protected static Configuration conf;
+  protected static long minAgeToArchive = 10000;
+
+  protected final static String famStr = "f1";
+  protected final static byte[] fam = Bytes.toBytes(famStr);
+  protected final static byte[] qualifier = Bytes.toBytes("q1");
+  protected final static long mobLen = 10;
+  protected final static byte[] mobVal = Bytes
+      .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
+
+  @Rule
+  public TestName test = new TestName();
+  protected TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
+  private ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor;
+  protected Admin admin;
+  protected TableName table = null;
+  protected int numRegions = 20;
+  protected int rows = 1000;
+
+  protected MobFileCleanerChore cleanerChore;
+
+  @BeforeClass
+  public static void htuStart() throws Exception {
+    HTU = new HBaseTestingUtility();
+    conf = HTU.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    // Disable automatic MOB compaction
+    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
+    // Disable automatic MOB file cleaner chore
+    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
+    // Set minimum age to archive to 10 sec
+    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
+    // Set compacted file discharger interval to a half minAgeToArchive
+    conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive/2);
+    conf.setBoolean("hbase.regionserver.compaction.enabled", false);
+    HTU.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void htuStop() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName());
+    admin = HTU.getAdmin();
+    cleanerChore = new MobFileCleanerChore();
+    familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
+    familyDescriptor.setMobEnabled(true);
+    familyDescriptor.setMobThreshold(mobLen);
+    familyDescriptor.setMaxVersions(1);
+    tableDescriptor.setColumnFamily(familyDescriptor);
+    RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
+    byte[][] splitKeys = splitAlgo.split(numRegions);
+    table = HTU.createTable(tableDescriptor, splitKeys).getName();
+  }
+
+  private void loadData(TableName tableName, int num) {
+
+    Random r = new Random();
+    LOG.info("Started loading {} rows into {}", num, tableName);
+    try (final Table table = HTU.getConnection().getTable(tableName)) {
+      for (int i = 0; i < num; i++) {
+        byte[] key = new byte[32];
+        r.nextBytes(key);
+        Put p = new Put(key);
+        p.addColumn(fam, qualifier, mobVal);
+        table.put(p);
+      }
+      admin.flush(tableName);
+      LOG.info("Finished loading {} rows into {}", num, tableName);
+    } catch (Exception e) {
+      LOG.error("MOB file compaction chore test FAILED", e);
+      fail("MOB file compaction chore test FAILED");
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(tableDescriptor.getTableName());
+    admin.deleteTable(tableDescriptor.getTableName());
+  }
+
+  @Test
+  public void baseTestMobFileCompaction() throws InterruptedException, IOException {
+    LOG.info("MOB compaction " + description() + " started");
+    loadAndFlushThreeTimes(rows, table, famStr);
+    mobCompact(tableDescriptor, familyDescriptor);
+    assertEquals("Should have 4 MOB files per region due to 3xflush + compaction.", numRegions * 4,
+        getNumberOfMobFiles(table, famStr));
+    cleanupAndVerifyCounts(table, famStr, 3*rows);
+    LOG.info("MOB compaction " + description() + " finished OK");
+  }
+
+  @Test
+  public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
+    final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
+    LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
+    loadAndFlushThreeTimes(rows, table, famStr);
+    LOG.debug("Taking snapshot and cloning table {}", table);
+    admin.snapshot(test.getMethodName(), table);
+    admin.cloneSnapshot(test.getMethodName(), clone);
+    assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
+        getNumberOfMobFiles(clone, famStr));
+    mobCompact(admin.getDescriptor(clone), familyDescriptor);
+    assertEquals("Should have 3 hlinks + 1 MOB file per region due to clone + compact",
+        4 * numRegions, getNumberOfMobFiles(clone, famStr));
+    cleanupAndVerifyCounts(clone, famStr, 3*rows);
+    LOG.info("MOB compaction of cloned snapshot, " + description() + " finished OK");
+  }
+
+  @Test
+  public void testMobFileCompactionAfterSnapshotCloneAndFlush() throws InterruptedException,
+      IOException {
+    final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
+    LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
+    loadAndFlushThreeTimes(rows, table, famStr);
+    LOG.debug("Taking snapshot and cloning table {}", table);
+    admin.snapshot(test.getMethodName(), table);
+    admin.cloneSnapshot(test.getMethodName(), clone);
+    assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
+        getNumberOfMobFiles(clone, famStr));
+    loadAndFlushThreeTimes(rows, clone, famStr);
+    mobCompact(admin.getDescriptor(clone), familyDescriptor);
+    assertEquals("Should have 7 MOB file per region due to clone + 3xflush + compact",
+        7 * numRegions, getNumberOfMobFiles(clone, famStr));
+    cleanupAndVerifyCounts(clone, famStr, 6*rows);
+    LOG.info("MOB compaction of cloned snapshot w flush, " + description() + " finished OK");
+  }
+
+  protected void loadAndFlushThreeTimes(int rows, TableName table, String family)
+      throws IOException {
+    final long start = getNumberOfMobFiles(table, family);
+    // Load and flush data 3 times
+    loadData(table, rows);
+    loadData(table, rows);
+    loadData(table, rows);
+    assertEquals("Should have 3 more mob files per region from flushing.", start +  numRegions * 3,
+        getNumberOfMobFiles(table, family));
+  }
+
+  protected String description() {
+    return "regular mode";
+  }
+
+  protected void enableCompactions() throws IOException {
+    final List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
+          .collect(Collectors.toList());
+    admin.compactionSwitch(true, serverList);
+  }
+
+  protected void disableCompactions() throws IOException {
+    final List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
+          .collect(Collectors.toList());
+    admin.compactionSwitch(false, serverList);
+  }
+
+  /**
+   * compact the given table and return once it is done.
+   * should presume compactions are disabled when called.
+   * should ensure compactions are disabled before returning.
+   */
+  protected void mobCompact(TableDescriptor tableDescriptor,
+      ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
+    LOG.debug("Major compact MOB table " + tableDescriptor.getTableName());
+    enableCompactions();
+    mobCompactImpl(tableDescriptor, familyDescriptor);
+    waitUntilCompactionIsComplete(tableDescriptor.getTableName());
+    disableCompactions();
+  }
+
+  /**
+   * Call the API for compaction specific to the test set.
+   * should not wait for compactions to finish.
+   * may assume compactions are enabled when called.
+   */
+  protected void mobCompactImpl(TableDescriptor tableDescriptor,
+      ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
+    admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
+  }
+
+  protected void waitUntilCompactionIsComplete(TableName table)
+      throws IOException, InterruptedException {
+    CompactionState state = admin.getCompactionState(table);
+    while (state != CompactionState.NONE) {
+      LOG.debug("Waiting for compaction on {} to complete. current state {}", table, state);
+      Thread.sleep(100);
+      state = admin.getCompactionState(table);
+    }
+    LOG.debug("done waiting for compaction on {}", table);
+  }
+
+  protected void cleanupAndVerifyCounts(TableName table, String family, int rows)
+      throws InterruptedException, IOException {
+    // We have guarantee, that compacted file discharger will run during this pause
+    // because it has interval less than this wait time
+    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
+
+    Thread.sleep(minAgeToArchive + 1000);
+    LOG.info("Cleaning up MOB files");
+    // Cleanup again
+    cleanerChore.cleanupObsoleteMobFiles(conf, table);
+
+    assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
+        getNumberOfMobFiles(table, family));
+
+    LOG.debug("checking count of rows");
+    long scanned = scanTable(table);
+    assertEquals("Got the wrong number of rows in table " + table + " cf " + family, rows, scanned);
+
+  }
+
+  protected  long getNumberOfMobFiles(TableName tableName, String family)
+      throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
+    FileStatus[] stat = fs.listStatus(dir);
+    for (FileStatus st : stat) {
+      LOG.debug("MOB Directory content: {}", st.getPath());
+    }
+    LOG.debug("MOB Directory content total files: {}", stat.length);
+
+    return stat.length;
+  }
+
+
+  protected long scanTable(TableName tableName) {
+    try (final Table table = HTU.getConnection().getTable(tableName);
+         final ResultScanner scanner = table.getScanner(fam)) {
+      Result result;
+      long counter = 0;
+      while ((result = scanner.next()) != null) {
+        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
+        counter++;
+      }
+      return counter;
+    } catch (Exception e) {
+      LOG.error("MOB file compaction test FAILED", e);
+      if (HTU != null) {
+        fail(e.getMessage());
+      } else {
+        System.exit(-1);
+      }
+    }
+    return 0;
+  }
+}