You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/07/22 21:52:35 UTC

[17/50] [abbrv] hbase git commit: Merge remote-tracking branch 'apache/master' (5/19/15) into hbase-11339

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
index 718b513,0000000..f02da48
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
@@@ -1,643 -1,0 +1,636 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.filecompactions;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.KeyValueUtil;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.Tag;
- import org.apache.hadoop.hbase.TagType;
++import org.apache.hadoop.hbase.*;
 +import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.io.HFileLink;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobFileName;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
 +import org.apache.hadoop.hbase.regionserver.*;
 +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.Pair;
 +
 +/**
 + * An implementation of {@link MobFileCompactor} that compacts the mob files in partitions.
 + */
 +@InterfaceAudience.Private
 +public class PartitionedMobFileCompactor extends MobFileCompactor {
 +
 +  private static final Log LOG = LogFactory.getLog(PartitionedMobFileCompactor.class);
 +  protected long mergeableSize;
 +  protected int delFileMaxCount;
 +  /** The number of files compacted in a batch */
 +  protected int compactionBatchSize;
 +  protected int compactionKVMax;
 +
 +  private Path tempPath;
 +  private Path bulkloadPath;
 +  private CacheConfig compactionCacheConfig;
 +  private Tag tableNameTag;
 +
 +  public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
 +    HColumnDescriptor column, ExecutorService pool) {
 +    super(conf, fs, tableName, column, pool);
 +    mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
 +    delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
 +      MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
 +    // default is 100
 +    compactionBatchSize = conf.getInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
 +    tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
 +    bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
 +      tableName.getNamespaceAsString(), tableName.getQualifierAsString())));
 +    compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
 +      HConstants.COMPACTION_KV_MAX_DEFAULT);
 +    Configuration copyOfConf = new Configuration(conf);
 +    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
 +    compactionCacheConfig = new CacheConfig(copyOfConf);
 +    tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
 +  }
 +
 +  @Override
 +  public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) throws IOException {
 +    if (files == null || files.isEmpty()) {
 +      LOG.info("No candidate mob files");
 +      return null;
 +    }
 +    LOG.info("isForceAllFiles: " + isForceAllFiles);
 +    // find the files to compact.
 +    PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles);
 +    // compact the files.
 +    return performCompaction(request);
 +  }
 +
 +  /**
 +   * Selects the compacted mob/del files.
 +   * Iterates the candidates to find out all the del files and small mob files.
 +   * @param candidates All the candidates.
 +   * @param isForceAllFiles Whether add all mob files into the compaction.
 +   * @return A compaction request.
 +   * @throws IOException
 +   */
 +  protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates,
 +    boolean isForceAllFiles) throws IOException {
 +    Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
 +    Map<CompactionPartitionId, CompactionPartition> filesToCompact =
 +      new HashMap<CompactionPartitionId, CompactionPartition>();
 +    int selectedFileCount = 0;
 +    int irrelevantFileCount = 0;
 +    for (FileStatus file : candidates) {
 +      if (!file.isFile()) {
 +        irrelevantFileCount++;
 +        continue;
 +      }
 +      // group the del files and small files.
 +      FileStatus linkedFile = file;
 +      if (HFileLink.isHFileLink(file.getPath())) {
 +        HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
 +        linkedFile = getLinkedFileStatus(link);
 +        if (linkedFile == null) {
 +          // If the linked file cannot be found, regard it as an irrelevantFileCount file
 +          irrelevantFileCount++;
 +          continue;
 +        }
 +      }
 +      if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
 +        allDelFiles.add(file);
 +      } else if (isForceAllFiles || linkedFile.getLen() < mergeableSize) {
 +        // add all files if isForceAllFiles is true,
 +        // otherwise add the small files to the merge pool
 +        MobFileName fileName = MobFileName.create(linkedFile.getPath().getName());
 +        CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(),
 +          fileName.getDate());
 +        CompactionPartition compactionPartition = filesToCompact.get(id);
 +        if (compactionPartition == null) {
 +          compactionPartition = new CompactionPartition(id);
 +          compactionPartition.addFile(file);
 +          filesToCompact.put(id, compactionPartition);
 +        } else {
 +          compactionPartition.addFile(file);
 +        }
 +        selectedFileCount++;
 +      }
 +    }
 +    PartitionedMobFileCompactionRequest request = new PartitionedMobFileCompactionRequest(
 +      filesToCompact.values(), allDelFiles);
 +    if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
 +      // all the files are selected
 +      request.setCompactionType(CompactionType.ALL_FILES);
 +    }
 +    LOG.info("The compaction type is " + request.getCompactionType() + ", the request has "
 +      + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and "
 +      + irrelevantFileCount + " irrelevant files");
 +    return request;
 +  }
 +
 +  /**
 +   * Performs the compaction on the selected files.
 +   * <ol>
 +   * <li>Compacts the del files.</li>
 +   * <li>Compacts the selected small mob files and all the del files.</li>
 +   * <li>If all the candidates are selected, delete the del files.</li>
 +   * </ol>
 +   * @param request The compaction request.
 +   * @return The paths of new mob files generated in the compaction.
 +   * @throws IOException
 +   */
 +  protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
 +    throws IOException {
 +    // merge the del files
 +    List<Path> delFilePaths = new ArrayList<Path>();
 +    for (FileStatus delFile : request.delFiles) {
 +      delFilePaths.add(delFile.getPath());
 +    }
 +    List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
 +    List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
 +    for (Path newDelPath : newDelPaths) {
 +      StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
 +      newDelFiles.add(sf);
 +    }
 +    LOG.info("After merging, there are " + newDelFiles.size() + " del files");
 +    // compact the mob files by partitions.
 +    List<Path> paths = compactMobFiles(request, newDelFiles);
 +    LOG.info("After compaction, there are " + paths.size() + " mob files");
 +    // archive the del files if all the mob files are selected.
 +    if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
 +      LOG.info("After a mob file compaction with all files selected, archiving the del files "
 +        + newDelFiles);
 +      try {
 +        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
 +      } catch (IOException e) {
 +        LOG.error("Failed to archive the del files " + newDelFiles, e);
 +      }
 +    }
 +    return paths;
 +  }
 +
 +  /**
 +   * Compacts the selected small mob files and all the del files.
 +   * @param request The compaction request.
 +   * @param delFiles The del files.
 +   * @return The paths of new mob files after compactions.
 +   * @throws IOException
 +   */
 +  protected List<Path> compactMobFiles(final PartitionedMobFileCompactionRequest request,
 +    final List<StoreFile> delFiles) throws IOException {
 +    Collection<CompactionPartition> partitions = request.compactionPartitions;
 +    if (partitions == null || partitions.isEmpty()) {
 +      LOG.info("No partitions of mob files");
 +      return Collections.emptyList();
 +    }
 +    List<Path> paths = new ArrayList<Path>();
 +    Connection c = ConnectionFactory.createConnection(conf);
 +    final Table table = c.getTable(tableName);
 +    try {
 +      Map<CompactionPartitionId, Future<List<Path>>> results =
 +        new HashMap<CompactionPartitionId, Future<List<Path>>>();
 +      // compact the mob files by partitions in parallel.
 +      for (final CompactionPartition partition : partitions) {
 +        results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
 +          @Override
 +          public List<Path> call() throws Exception {
 +            LOG.info("Compacting mob files for partition " + partition.getPartitionId());
 +            return compactMobFilePartition(request, partition, delFiles, table);
 +          }
 +        }));
 +      }
 +      // compact the partitions in parallel.
 +      boolean hasFailure = false;
 +      for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
 +        try {
 +          paths.addAll(result.getValue().get());
 +        } catch (Exception e) {
 +          // just log the error
 +          LOG.error("Failed to compact the partition " + result.getKey(), e);
 +          hasFailure = true;
 +        }
 +      }
 +      if (hasFailure) {
 +        // if any partition fails in the compaction, directly throw an exception.
 +        throw new IOException("Failed to compact the partitions");
 +      }
 +    } finally {
 +      try {
 +        table.close();
 +      } catch (IOException e) {
 +        LOG.error("Failed to close the HTable", e);
 +      }
 +    }
 +    return paths;
 +  }
 +
 +  /**
 +   * Compacts a partition of selected small mob files and all the del files.
 +   * @param request The compaction request.
 +   * @param partition A compaction partition.
 +   * @param delFiles The del files.
 +   * @param table The current table.
 +   * @return The paths of new mob files after compactions.
 +   * @throws IOException
 +   */
 +  private List<Path> compactMobFilePartition(PartitionedMobFileCompactionRequest request,
 +    CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException {
 +    List<Path> newFiles = new ArrayList<Path>();
 +    List<FileStatus> files = partition.listFiles();
 +    int offset = 0;
 +    Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
 +    Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
 +    while (offset < files.size()) {
 +      int batch = compactionBatchSize;
 +      if (files.size() - offset < compactionBatchSize) {
 +        batch = files.size() - offset;
 +      }
 +      if (batch == 1 && delFiles.isEmpty()) {
 +        // only one file left and no del files, do not compact it,
 +        // and directly add it to the new files.
 +        newFiles.add(files.get(offset).getPath());
 +        offset++;
 +        continue;
 +      }
 +      // clean the bulkload directory to avoid loading old files.
 +      fs.delete(bulkloadPathOfPartition, true);
 +      // add the selected mob files and del files into filesToCompact
 +      List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
 +      for (int i = offset; i < batch + offset; i++) {
 +        StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
 +          BloomType.NONE);
 +        filesToCompact.add(sf);
 +      }
 +      filesToCompact.addAll(delFiles);
 +      // compact the mob files in a batch.
 +      compactMobFilesInBatch(request, partition, table, filesToCompact, batch,
 +        bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
 +      // move to the next batch.
 +      offset += batch;
 +    }
 +    LOG.info("Compaction is finished. The number of mob files is changed from " + files.size()
 +      + " to " + newFiles.size());
 +    return newFiles;
 +  }
 +
 +  /**
 +   * Compacts a partition of selected small mob files and all the del files in a batch.
 +   * @param request The compaction request.
 +   * @param partition A compaction partition.
 +   * @param table The current table.
 +   * @param filesToCompact The files to be compacted.
 +   * @param batch The number of mob files to be compacted in a batch.
 +   * @param bulkloadPathOfPartition The directory where the bulkload column of the current
 +   *        partition is saved.
 +   * @param bulkloadColumnPath The directory where the bulkload files of current partition
 +   *        are saved.
 +   * @param newFiles The paths of new mob files after compactions.
 +   * @throws IOException
 +   */
 +  private void compactMobFilesInBatch(PartitionedMobFileCompactionRequest request,
 +    CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch,
 +    Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
 +    throws IOException {
 +    // open scanner to the selected mob files and del files.
 +    StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
 +    // the mob files to be compacted, not include the del files.
 +    List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
 +    // Pair(maxSeqId, cellsCount)
 +    Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
 +    // open writers for the mob files and new ref store files.
 +    Writer writer = null;
 +    Writer refFileWriter = null;
 +    Path filePath = null;
 +    Path refFilePath = null;
 +    long mobCells = 0;
 +    try {
 +      writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
 +        tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId()
 +          .getStartKey(), compactionCacheConfig);
 +      filePath = writer.getPath();
 +      byte[] fileName = Bytes.toBytes(filePath.getName());
 +      // create a temp file and open a writer for it in the bulkloadPath
 +      refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
 +        .getSecond().longValue(), compactionCacheConfig);
 +      refFilePath = refFileWriter.getPath();
 +      List<Cell> cells = new ArrayList<Cell>();
 +      boolean hasMore = false;
 +      ScannerContext scannerContext =
 +              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
 +      do {
 +        hasMore = scanner.next(cells, scannerContext);
 +        for (Cell cell : cells) {
 +          // TODO remove this after the new code are introduced.
 +          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
 +          // write the mob cell to the mob file.
 +          writer.append(kv);
 +          // write the new reference cell to the store file.
 +          KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
 +          refFileWriter.append(reference);
 +          mobCells++;
 +        }
 +        cells.clear();
 +      } while (hasMore);
 +    } finally {
 +      // close the scanner.
 +      scanner.close();
 +      // append metadata to the mob file, and close the mob file writer.
 +      closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
 +      // append metadata and bulkload info to the ref mob file, and close the writer.
 +      closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
 +    }
 +    if (mobCells > 0) {
 +      // commit mob file
 +      MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
 +      // bulkload the ref file
 +      bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName());
 +      newFiles.add(new Path(mobFamilyDir, filePath.getName()));
 +    } else {
 +      // remove the new files
 +      // the mob file is empty, delete it instead of committing.
 +      deletePath(filePath);
 +      // the ref file is empty, delete it instead of committing.
 +      deletePath(refFilePath);
 +    }
 +    // archive the old mob files, do not archive the del files.
 +    try {
 +      MobUtils
 +        .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
 +    } catch (IOException e) {
 +      LOG.error("Failed to archive the files " + mobFilesToCompact, e);
 +    }
 +  }
 +
 +  /**
 +   * Compacts the del files in batches which avoids opening too many files.
 +   * @param request The compaction request.
 +   * @param delFilePaths
 +   * @return The paths of new del files after merging or the original files if no merging
 +   *         is necessary.
 +   * @throws IOException
 +   */
 +  protected List<Path> compactDelFiles(PartitionedMobFileCompactionRequest request,
 +    List<Path> delFilePaths) throws IOException {
 +    if (delFilePaths.size() <= delFileMaxCount) {
 +      return delFilePaths;
 +    }
 +    // when there are more del files than the number that is allowed, merge it firstly.
 +    int offset = 0;
 +    List<Path> paths = new ArrayList<Path>();
 +    while (offset < delFilePaths.size()) {
 +      // get the batch
 +      int batch = compactionBatchSize;
 +      if (delFilePaths.size() - offset < compactionBatchSize) {
 +        batch = delFilePaths.size() - offset;
 +      }
 +      List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>();
 +      if (batch == 1) {
 +        // only one file left, do not compact it, directly add it to the new files.
 +        paths.add(delFilePaths.get(offset));
 +        offset++;
 +        continue;
 +      }
 +      for (int i = offset; i < batch + offset; i++) {
 +        batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
 +          BloomType.NONE));
 +      }
 +      // compact the del files in a batch.
 +      paths.add(compactDelFilesInBatch(request, batchedDelFiles));
 +      // move to the next batch.
 +      offset += batch;
 +    }
 +    return compactDelFiles(request, paths);
 +  }
 +
 +  /**
 +   * Compacts the del file in a batch.
 +   * @param request The compaction request.
 +   * @param delFiles The del files.
 +   * @return The path of new del file after merging.
 +   * @throws IOException
 +   */
 +  private Path compactDelFilesInBatch(PartitionedMobFileCompactionRequest request,
 +    List<StoreFile> delFiles) throws IOException {
 +    // create a scanner for the del files.
 +    StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
 +    Writer writer = null;
 +    Path filePath = null;
 +    try {
 +      writer = MobUtils.createDelFileWriter(conf, fs, column,
 +        MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
 +        column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig);
 +      filePath = writer.getPath();
 +      List<Cell> cells = new ArrayList<Cell>();
 +      boolean hasMore = false;
 +      ScannerContext scannerContext =
 +              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
 +      do {
 +        hasMore = scanner.next(cells, scannerContext);
 +        for (Cell cell : cells) {
 +          // TODO remove this after the new code are introduced.
 +          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
 +          writer.append(kv);
 +        }
 +        cells.clear();
 +      } while (hasMore);
 +    } finally {
 +      scanner.close();
 +      if (writer != null) {
 +        try {
 +          writer.close();
 +        } catch (IOException e) {
 +          LOG.error("Failed to close the writer of the file " + filePath, e);
 +        }
 +      }
 +    }
 +    // commit the new del file
 +    Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
 +    // archive the old del files
 +    try {
 +      MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
 +    } catch (IOException e) {
 +      LOG.error("Failed to archive the old del files " + delFiles, e);
 +    }
 +    return path;
 +  }
 +
 +  /**
 +   * Creates a store scanner.
 +   * @param filesToCompact The files to be compacted.
 +   * @param scanType The scan type.
 +   * @return The store scanner.
 +   * @throws IOException
 +   */
 +  private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
 +    throws IOException {
 +    List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
 +      null, HConstants.LATEST_TIMESTAMP);
 +    Scan scan = new Scan();
 +    scan.setMaxVersions(column.getMaxVersions());
 +    long ttl = HStore.determineTTLFromFamily(column);
-     ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR);
++    ScanInfo scanInfo = new ScanInfo(column, ttl, 0, CellComparator.COMPARATOR);
 +    StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
 +      HConstants.LATEST_TIMESTAMP);
 +    return scanner;
 +  }
 +
 +  /**
 +   * Bulkloads the current file.
 +   * @param table The current table.
 +   * @param bulkloadDirectory The path of bulkload directory.
 +   * @param fileName The current file name.
 +   * @throws IOException
 +   */
 +  private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName)
 +    throws IOException {
 +    // bulkload the ref file
 +    try {
 +      LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
 +      bulkload.doBulkLoad(bulkloadDirectory, (HTable)table);
 +    } catch (Exception e) {
 +      // delete the committed mob file
 +      deletePath(new Path(mobFamilyDir, fileName));
 +      throw new IOException(e);
 +    } finally {
 +      // delete the bulkload files in bulkloadPath
 +      deletePath(bulkloadDirectory);
 +    }
 +  }
 +
 +  /**
 +   * Closes the mob file writer.
 +   * @param writer The mob file writer.
 +   * @param maxSeqId Maximum sequence id.
 +   * @param mobCellsCount The number of mob cells.
 +   * @throws IOException
 +   */
 +  private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount)
 +    throws IOException {
 +    if (writer != null) {
 +      writer.appendMetadata(maxSeqId, false, mobCellsCount);
 +      try {
 +        writer.close();
 +      } catch (IOException e) {
 +        LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Closes the ref file writer.
 +   * @param writer The ref file writer.
 +   * @param maxSeqId Maximum sequence id.
 +   * @param bulkloadTime The timestamp at which the bulk load file is created.
 +   * @throws IOException
 +   */
 +  private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime)
 +    throws IOException {
 +    if (writer != null) {
 +      writer.appendMetadata(maxSeqId, false);
 +      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
 +      try {
 +        writer.close();
 +      } catch (IOException e) {
 +        LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Gets the max seqId and number of cells of the store files.
 +   * @param storeFiles The store files.
 +   * @return The pair of the max seqId and number of cells of the store files.
 +   * @throws IOException
 +   */
 +  private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException {
 +    long maxSeqId = 0;
 +    long maxKeyCount = 0;
 +    for (StoreFile sf : storeFiles) {
 +      // the readers will be closed later after the merge.
 +      maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
 +      byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
 +      if (count != null) {
 +        maxKeyCount += Bytes.toLong(count);
 +      }
 +    }
 +    return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
 +  }
 +
 +  /**
 +   * Deletes a file.
 +   * @param path The path of the file to be deleted.
 +   */
 +  private void deletePath(Path path) {
 +    try {
 +      if (path != null) {
 +        fs.delete(path, true);
 +      }
 +    } catch (IOException e) {
 +      LOG.error("Failed to delete the file " + path, e);
 +    }
 +  }
 +
 +  private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
 +    Path[] locations = link.getLocations();
 +    for (Path location : locations) {
 +      FileStatus file = getFileStatus(location);
 +      if (file != null) {
 +        return file;
 +      }
 +    }
 +    return null;
 +  }
 +
 +  private FileStatus getFileStatus(Path path) throws IOException {
 +    try {
 +      if (path != null) {
 +        FileStatus file = fs.getFileStatus(path);
 +        return file;
 +      }
 +    } catch (FileNotFoundException e) {
 +      LOG.warn("The file " + path + " can not be found", e);
 +    }
 +    return null;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index d55822d,519edde..04d2b13
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@@ -63,15 -63,7 +63,15 @@@ public class DefaultStoreEngine extend
  
    @Override
    protected void createComponents(
-       Configuration conf, Store store, KVComparator kvComparator) throws IOException {
+       Configuration conf, Store store, CellComparator kvComparator) throws IOException {
 +    createCompactor(conf, store);
 +    createCompactionPolicy(conf, store);
 +    createStoreFlusher(conf, store);
 +    storeFileManager = new DefaultStoreFileManager(kvComparator, conf, compactionPolicy.getConf());
 +
 +  }
 +
 +  protected void createCompactor(Configuration conf, Store store) throws IOException {
      String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName());
      try {
        compactor = ReflectionUtils.instantiateWithCustomCtor(className,

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index b4d2213,0000000..e4bdc74
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@@ -1,560 -1,0 +1,558 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.regionserver;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Date;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.NavigableSet;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentHashMap;
 +
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.KeyValue;
++import org.apache.hadoop.hbase.*;
 +import org.apache.hadoop.hbase.KeyValue.KVComparator;
 +import org.apache.hadoop.hbase.KeyValue.Type;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.Tag;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.filter.Filter;
 +import org.apache.hadoop.hbase.filter.FilterList;
 +import org.apache.hadoop.hbase.io.compress.Compression;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.io.hfile.HFile;
 +import org.apache.hadoop.hbase.io.hfile.HFileContext;
 +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 +import org.apache.hadoop.hbase.master.TableLockManager;
 +import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 +import org.apache.hadoop.hbase.mob.MobCacheConfig;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobFile;
 +import org.apache.hadoop.hbase.mob.MobFileName;
 +import org.apache.hadoop.hbase.mob.MobStoreEngine;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
 +import org.apache.hadoop.hbase.util.Bytes;
++import org.apache.hadoop.hbase.util.ChecksumType;
 +import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 +import org.apache.hadoop.hbase.util.IdLock;
 +
 +/**
 + * The store implementation to save MOBs (medium objects), it extends the HStore.
 + * When a descriptor of a column family has the value "IS_MOB", it means this column family
 + * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is
 + * created.
 + * HMobStore is almost the same with the HStore except using different types of scanners.
 + * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned.
 + * In these scanners, a additional seeks in the mob files should be performed after the seek
 + * to HBase is done.
 + * The store implements how we save MOBs by extending HStore. When a descriptor
 + * of a column family has the value "IS_MOB", it means this column family is a mob one. When a
 + * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is
 + * almost the same with the HStore except using different types of scanners. In the method of
 + * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a
 + * additional seeks in the mob files should be performed after the seek in HBase is done.
 + */
 +@InterfaceAudience.Private
 +public class HMobStore extends HStore {
- 
++  private static final Log LOG = LogFactory.getLog(HMobStore.class);
 +  private MobCacheConfig mobCacheConfig;
 +  private Path homePath;
 +  private Path mobFamilyPath;
 +  private volatile long mobCompactedIntoMobCellsCount = 0;
 +  private volatile long mobCompactedFromMobCellsCount = 0;
 +  private volatile long mobCompactedIntoMobCellsSize = 0;
 +  private volatile long mobCompactedFromMobCellsSize = 0;
 +  private volatile long mobFlushCount = 0;
 +  private volatile long mobFlushedCellsCount = 0;
 +  private volatile long mobFlushedCellsSize = 0;
 +  private volatile long mobScanCellsCount = 0;
 +  private volatile long mobScanCellsSize = 0;
 +  private HColumnDescriptor family;
 +  private TableLockManager tableLockManager;
 +  private TableName tableLockName;
 +  private Map<String, List<Path>> map = new ConcurrentHashMap<String, List<Path>>();
 +  private final IdLock keyLock = new IdLock();
 +
 +  public HMobStore(final HRegion region, final HColumnDescriptor family,
 +      final Configuration confParam) throws IOException {
 +    super(region, family, confParam);
 +    this.family = family;
 +    this.mobCacheConfig = (MobCacheConfig) cacheConf;
 +    this.homePath = MobUtils.getMobHome(conf);
 +    this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
 +        family.getNameAsString());
 +    List<Path> locations = new ArrayList<Path>(2);
 +    locations.add(mobFamilyPath);
 +    TableName tn = region.getTableDesc().getTableName();
 +    locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn)
 +        .getEncodedName(), family.getNameAsString()));
 +    map.put(Bytes.toString(tn.getName()), locations);
 +    if (region.getRegionServerServices() != null) {
 +      tableLockManager = region.getRegionServerServices().getTableLockManager();
 +      tableLockName = MobUtils.getTableLockName(getTableName());
 +    }
 +  }
 +
 +  /**
 +   * Creates the mob cache config.
 +   */
 +  @Override
 +  protected void createCacheConf(HColumnDescriptor family) {
 +    cacheConf = new MobCacheConfig(conf, family);
 +  }
 +
 +  /**
 +   * Gets current config.
 +   */
 +  public Configuration getConfiguration() {
 +    return this.conf;
 +  }
 +
 +  /**
 +   * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
 +   * the mob files should be performed after the seek in HBase is done.
 +   */
 +  @Override
 +  protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
 +      long readPt, KeyValueScanner scanner) throws IOException {
 +    if (scanner == null) {
 +      if (MobUtils.isRefOnlyScan(scan)) {
 +        Filter refOnlyFilter = new MobReferenceOnlyFilter();
 +        Filter filter = scan.getFilter();
 +        if (filter != null) {
 +          scan.setFilter(new FilterList(filter, refOnlyFilter));
 +        } else {
 +          scan.setFilter(refOnlyFilter);
 +        }
 +      }
 +      scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
 +          targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
 +    }
 +    return scanner;
 +  }
 +
 +  /**
 +   * Creates the mob store engine.
 +   */
 +  @Override
 +  protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
-       KVComparator kvComparator) throws IOException {
++      CellComparator cellComparator) throws IOException {
 +    MobStoreEngine engine = new MobStoreEngine();
-     engine.createComponents(conf, store, kvComparator);
++    engine.createComponents(conf, store, cellComparator);
 +    return engine;
 +  }
 +
 +  /**
 +   * Gets the temp directory.
 +   * @return The temp directory.
 +   */
 +  private Path getTempDir() {
 +    return new Path(homePath, MobConstants.TEMP_DIR_NAME);
 +  }
 +
 +  /**
 +   * Creates the writer for the mob file in temp directory.
 +   * @param date The latest date of written cells.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @param startKey The start key.
 +   * @return The writer for the mob file.
 +   * @throws IOException
 +   */
 +  public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
 +      Compression.Algorithm compression, byte[] startKey) throws IOException {
 +    if (startKey == null) {
 +      startKey = HConstants.EMPTY_START_ROW;
 +    }
 +    Path path = getTempDir();
 +    return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
 +  }
 +
 +  /**
 +   * Creates the writer for the del file in temp directory.
 +   * The del file keeps tracking the delete markers. Its name has a suffix _del,
 +   * the format is [0-9a-f]+(_del)?.
 +   * @param date The latest date of written cells.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @param startKey The start key.
 +   * @return The writer for the del file.
 +   * @throws IOException
 +   */
 +  public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount,
 +      Compression.Algorithm compression, byte[] startKey) throws IOException {
 +    if (startKey == null) {
 +      startKey = HConstants.EMPTY_START_ROW;
 +    }
 +    Path path = getTempDir();
 +    String suffix = UUID
 +        .randomUUID().toString().replaceAll("-", "") + "_del";
 +    MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix);
 +    return createWriterInTmp(mobFileName, path, maxKeyCount, compression);
 +  }
 +
 +  /**
 +   * Creates the writer for the mob file in temp directory.
 +   * @param date The date string, its format is yyyymmmdd.
 +   * @param basePath The basic path for a temp directory.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @param startKey The start key.
 +   * @return The writer for the mob file.
 +   * @throws IOException
 +   */
 +  public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
 +      Compression.Algorithm compression, byte[] startKey) throws IOException {
 +    MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
 +        .toString().replaceAll("-", ""));
 +    return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression);
 +  }
 +
 +  /**
 +   * Creates the writer for the mob file in temp directory.
 +   * @param mobFileName The mob file name.
 +   * @param basePath The basic path for a temp directory.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @return The writer for the mob file.
 +   * @throws IOException
 +   */
 +  public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
 +      Compression.Algorithm compression) throws IOException {
 +    final CacheConfig writerCacheConf = mobCacheConfig;
 +    HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
 +        .withIncludesMvcc(true).withIncludesTags(true)
-         .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
++        .withChecksumType(ChecksumType.getDefaultChecksumType())
 +        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
 +        .withBlockSize(getFamily().getBlocksize())
 +        .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding())
 +        .withEncryptionContext(cryptoContext).build();
 +
 +    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
 +        .withFilePath(new Path(basePath, mobFileName.getFileName()))
-         .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
++        .withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE)
 +        .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
 +    return w;
 +  }
 +
 +  /**
 +   * Commits the mob file.
 +   * @param sourceFile The source file.
 +   * @param targetPath The directory path where the source file is renamed to.
 +   * @throws IOException
 +   */
 +  public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
 +    if (sourceFile == null) {
 +      return;
 +    }
 +    Path dstPath = new Path(targetPath, sourceFile.getName());
 +    validateMobFile(sourceFile);
 +    String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
 +    LOG.info(msg);
 +    Path parent = dstPath.getParent();
 +    if (!region.getFilesystem().exists(parent)) {
 +      region.getFilesystem().mkdirs(parent);
 +    }
 +    if (!region.getFilesystem().rename(sourceFile, dstPath)) {
 +      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
 +    }
 +  }
 +
 +  /**
 +   * Validates a mob file by opening and closing it.
 +   *
 +   * @param path the path to the mob file
 +   */
 +  private void validateMobFile(Path path) throws IOException {
 +    StoreFile storeFile = null;
 +    try {
 +      storeFile =
 +          new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
 +      storeFile.createReader();
 +    } catch (IOException e) {
 +      LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
 +      throw e;
 +    } finally {
 +      if (storeFile != null) {
 +        storeFile.closeReader(false);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Reads the cell from the mob file, and the read point does not count.
 +   * @param reference The cell found in the HBase, its value is a path to a mob file.
 +   * @param cacheBlocks Whether the scanner should cache blocks.
 +   * @return The cell found in the mob file.
 +   * @throws IOException
 +   */
 +  public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
 +    return resolve(reference, cacheBlocks, -1);
 +  }
 +
 +  /**
 +   * Reads the cell from the mob file.
 +   * @param reference The cell found in the HBase, its value is a path to a mob file.
 +   * @param cacheBlocks Whether the scanner should cache blocks.
 +   * @param readPt the read point.
 +   * @return The cell found in the mob file.
 +   * @throws IOException
 +   */
 +  public Cell resolve(Cell reference, boolean cacheBlocks, long readPt) throws IOException {
 +    Cell result = null;
 +    if (MobUtils.hasValidMobRefCellValue(reference)) {
 +      String fileName = MobUtils.getMobFileName(reference);
 +      Tag tableNameTag = MobUtils.getTableNameTag(reference);
 +      if (tableNameTag != null) {
 +        byte[] tableName = tableNameTag.getValue();
 +        String tableNameString = Bytes.toString(tableName);
 +        List<Path> locations = map.get(tableNameString);
 +        if (locations == null) {
 +          IdLock.Entry lockEntry = keyLock.getLockEntry(tableNameString.hashCode());
 +          try {
 +            locations = map.get(tableNameString);
 +            if (locations == null) {
 +              locations = new ArrayList<Path>(2);
 +              TableName tn = TableName.valueOf(tableName);
 +              locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
 +              locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
 +                  .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
 +              map.put(tableNameString, locations);
 +            }
 +          } finally {
 +            keyLock.releaseLockEntry(lockEntry);
 +          }
 +        }
 +        result = readCell(locations, fileName, reference, cacheBlocks, readPt);
 +      }
 +    }
 +    if (result == null) {
 +      LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
 +          + "qualifier,timestamp,type and tags but with an empty value to return.");
 +      result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
 +          reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(),
 +          reference.getFamilyLength(), reference.getQualifierArray(),
 +          reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(),
 +          Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY,
 +          0, 0, reference.getTagsArray(), reference.getTagsOffset(),
 +          reference.getTagsLength());
 +    }
 +    return result;
 +  }
 +
 +  /**
 +   * Reads the cell from a mob file.
 +   * The mob file might be located in different directories.
 +   * 1. The working directory.
 +   * 2. The archive directory.
 +   * Reads the cell from the files located in both of the above directories.
 +   * @param locations The possible locations where the mob files are saved.
 +   * @param fileName The file to be read.
 +   * @param search The cell to be searched.
 +   * @param cacheMobBlocks Whether the scanner should cache blocks.
 +   * @param readPt the read point.
 +   * @return The found cell. Null if there's no such a cell.
 +   * @throws IOException
 +   */
 +  private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks,
 +    long readPt) throws IOException {
 +    FileSystem fs = getFileSystem();
 +    for (Path location : locations) {
 +      MobFile file = null;
 +      Path path = new Path(location, fileName);
 +      try {
 +        file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig);
 +        return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search,
 +          cacheMobBlocks);
 +      } catch (IOException e) {
 +        mobCacheConfig.getMobFileCache().evictFile(fileName);
 +        if ((e instanceof FileNotFoundException) ||
 +            (e.getCause() instanceof FileNotFoundException)) {
 +          LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e);
 +        } else {
 +          throw e;
 +        }
 +      } catch (NullPointerException e) {
 +        mobCacheConfig.getMobFileCache().evictFile(fileName);
 +        LOG.warn("Fail to read the cell", e);
 +      } catch (AssertionError e) {
 +        mobCacheConfig.getMobFileCache().evictFile(fileName);
 +        LOG.warn("Fail to read the cell", e);
 +      } finally {
 +        if (file != null) {
 +          mobCacheConfig.getMobFileCache().closeFile(file);
 +        }
 +      }
 +    }
 +    LOG.error("The mob file " + fileName + " could not be found in the locations "
 +        + locations);
 +    return null;
 +  }
 +
 +  /**
 +   * Gets the mob file path.
 +   * @return The mob file path.
 +   */
 +  public Path getPath() {
 +    return mobFamilyPath;
 +  }
 +
 +  /**
 +   * The compaction in the store of mob.
 +   * The cells in this store contains the path of the mob files. There might be race
 +   * condition between the major compaction and the sweeping in mob files.
 +   * In order to avoid this, we need mutually exclude the running of the major compaction and
 +   * sweeping in mob files.
 +   * The minor compaction is not affected.
 +   * The major compaction is marked as retainDeleteMarkers when a sweeping is in progress.
 +   */
 +  @Override
 +  public List<StoreFile> compact(CompactionContext compaction,
 +      CompactionThroughputController throughputController) throws IOException {
 +    // If it's major compaction, try to find whether there's a sweeper is running
 +    // If yes, mark the major compaction as retainDeleteMarkers
 +    if (compaction.getRequest().isAllFiles()) {
 +      // Use the Zookeeper to coordinate.
 +      // 1. Acquire a operation lock.
 +      //   1.1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction.
 +      //   1.2. If the lock is obtained, search the node of sweeping.
 +      //      1.2.1. If the node is there, the sweeping is in progress, mark the major
 +      //             compaction as retainDeleteMarkers and continue the compaction.
 +      //      1.2.2. If the node is not there, add a child to the major compaction node, and
 +      //             run the compaction directly.
 +      TableLock lock = null;
 +      if (tableLockManager != null) {
 +        lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore");
 +      }
 +      boolean tableLocked = false;
 +      String tableName = getTableName().getNameAsString();
 +      if (lock != null) {
 +        try {
 +          LOG.info("Start to acquire a read lock for the table[" + tableName
 +              + "], ready to perform the major compaction");
 +          lock.acquire();
 +          tableLocked = true;
 +        } catch (Exception e) {
 +          LOG.error("Fail to lock the table " + tableName, e);
 +        }
 +      } else {
 +        // If the tableLockManager is null, mark the tableLocked as true.
 +        tableLocked = true;
 +      }
 +      try {
 +        if (!tableLocked) {
 +          LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table["
 +              + tableName + "], forcing the delete markers to be retained");
 +          compaction.getRequest().forceRetainDeleteMarkers();
 +        }
 +        return super.compact(compaction, throughputController);
 +      } finally {
 +        if (tableLocked && lock != null) {
 +          try {
 +            lock.release();
 +          } catch (IOException e) {
 +            LOG.error("Fail to release the table lock " + tableName, e);
 +          }
 +        }
 +      }
 +    } else {
 +      // If it's not a major compaction, continue the compaction.
 +      return super.compact(compaction, throughputController);
 +    }
 +  }
 +
 +  public void updateMobCompactedIntoMobCellsCount(long count) {
 +    mobCompactedIntoMobCellsCount += count;
 +  }
 +
 +  public long getMobCompactedIntoMobCellsCount() {
 +    return mobCompactedIntoMobCellsCount;
 +  }
 +
 +  public void updateMobCompactedFromMobCellsCount(long count) {
 +    mobCompactedFromMobCellsCount += count;
 +  }
 +
 +  public long getMobCompactedFromMobCellsCount() {
 +    return mobCompactedFromMobCellsCount;
 +  }
 +
 +  public void updateMobCompactedIntoMobCellsSize(long size) {
 +    mobCompactedIntoMobCellsSize += size;
 +  }
 +
 +  public long getMobCompactedIntoMobCellsSize() {
 +    return mobCompactedIntoMobCellsSize;
 +  }
 +
 +  public void updateMobCompactedFromMobCellsSize(long size) {
 +    mobCompactedFromMobCellsSize += size;
 +  }
 +
 +  public long getMobCompactedFromMobCellsSize() {
 +    return mobCompactedFromMobCellsSize;
 +  }
 +
 +  public void updateMobFlushCount() {
 +    mobFlushCount++;
 +  }
 +
 +  public long getMobFlushCount() {
 +    return mobFlushCount;
 +  }
 +
 +  public void updateMobFlushedCellsCount(long count) {
 +    mobFlushedCellsCount += count;
 +  }
 +
 +  public long getMobFlushedCellsCount() {
 +    return mobFlushedCellsCount;
 +  }
 +
 +  public void updateMobFlushedCellsSize(long size) {
 +    mobFlushedCellsSize += size;
 +  }
 +
 +  public long getMobFlushedCellsSize() {
 +    return mobFlushedCellsSize;
 +  }
 +
 +  public void updateMobScanCellsCount(long count) {
 +    mobScanCellsCount += count;
 +  }
 +
 +  public long getMobScanCellsCount() {
 +    return mobScanCellsCount;
 +  }
 +
 +  public void updateMobScanCellsSize(long size) {
 +    mobScanCellsSize += size;
 +  }
 +
 +  public long getMobScanCellsSize() {
 +    return mobScanCellsSize;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index ad7ce98,5d7248d..552ffd2
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@@ -55,8 -55,8 +55,9 @@@ import org.apache.hadoop.hbase.HColumnD
  import org.apache.hadoop.hbase.HConstants;
  import org.apache.hadoop.hbase.HRegionInfo;
  import org.apache.hadoop.hbase.KeyValue;
+ import org.apache.hadoop.hbase.KeyValueUtil;
  import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.KeyValue.KVComparator;
  import org.apache.hadoop.hbase.Tag;
  import org.apache.hadoop.hbase.TagType;
  import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@@ -344,27 -344,6 +345,27 @@@ public class HStore implements Store 
    }
  
    /**
 +   * Creates the cache config.
 +   * @param family The current column family.
 +   */
 +  protected void createCacheConf(final HColumnDescriptor family) {
 +    this.cacheConf = new CacheConfig(conf, family);
 +  }
 +
 +  /**
 +   * Creates the store engine configured for the given Store.
 +   * @param store The store. An unfortunate dependency needed due to it
 +   *              being passed to coprocessors via the compactor.
 +   * @param conf Store configuration.
 +   * @param kvComparator KVComparator for storeFileManager.
 +   * @return StoreEngine to use.
 +   */
 +  protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
-       KVComparator kvComparator) throws IOException {
++      CellComparator kvComparator) throws IOException {
 +    return StoreEngine.create(store, conf, comparator);
 +  }
 +
 +  /**
     * @param family
     * @return TTL in seconds of the specified family
     */

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 090be8c,bc8dd01..68ce76a
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@@ -98,10 -98,13 +98,11 @@@ public class DefaultCompactor extends C
            smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
            cleanSeqId = true;
          }
+ 
 -        // When all MVCC readpoints are 0, don't write them.
 -        // See HBASE-8166, HBASE-12600, and HBASE-13389.
 -        writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
 -          fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0);
 +        writer = createTmpWriter(fd, smallestReadPoint);
          boolean finished =
 -            performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController);
 +            performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, throughputController,
 +                    request.isAllFiles());
          if (!finished) {
            writer.close();
            store.getFileSystem().delete(writer.getPath(), false);
@@@ -145,20 -148,6 +146,24 @@@
    }
  
    /**
 +   * Creates a writer for a new file in a temporary directory.
 +   * @param fd The file details.
 +   * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
 +   * @return Writer for a new StoreFile in the tmp dir.
 +   * @throws IOException
 +   */
-   protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint)
-       throws IOException {
++  protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
++    // When all MVCC readpoints are 0, don't write them.
++    // See HBASE-8166, HBASE-12600, and HBASE-13389.
++
++    // make this writer with tags always because of possible new cells with tags.
 +    StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
-         true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
++            true, fd.maxMVCCReadpoint >= 0, fd.maxTagsLength >0);
 +    return writer;
 +  }
 +
++
 +  /**
     * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
     * {@link #compact(CompactionRequest, CompactionThroughputController)};
     * @param filesToCompact the files to compact. These are used as the compactionSelection for

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
index ed3853e,0000000..544d145
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
@@@ -1,441 -1,0 +1,436 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.filecompactions;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.List;
 +import java.util.Random;
 +import java.util.UUID;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.RejectedExecutionException;
 +import java.util.concurrent.RejectedExecutionHandler;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.HBaseTestingUtility;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.KeyValue;
++import org.apache.hadoop.hbase.*;
 +import org.apache.hadoop.hbase.KeyValue.Type;
 +import org.apache.hadoop.hbase.regionserver.*;
 +import org.apache.hadoop.hbase.testclassification.LargeTests;
- import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.io.hfile.HFileContext;
 +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobFileName;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +import org.apache.hadoop.hbase.util.Threads;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +@Category(LargeTests.class)
 +public class TestPartitionedMobFileCompactor {
 +  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 +  private final static String family = "family";
 +  private final static String qf = "qf";
 +  private HColumnDescriptor hcd = new HColumnDescriptor(family);
 +  private Configuration conf = TEST_UTIL.getConfiguration();
 +  private CacheConfig cacheConf = new CacheConfig(conf);
 +  private FileSystem fs;
 +  private List<FileStatus> mobFiles = new ArrayList<>();
 +  private List<FileStatus> delFiles = new ArrayList<>();
 +  private List<FileStatus> allFiles = new ArrayList<>();
 +  private Path basePath;
 +  private String mobSuffix;
 +  private String delSuffix;
 +  private static ExecutorService pool;
 +
 +  @BeforeClass
 +  public static void setUpBeforeClass() throws Exception {
 +    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
 +    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
 +    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
 +    TEST_UTIL.startMiniCluster(1);
 +    pool = createThreadPool();
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    pool.shutdown();
 +    TEST_UTIL.shutdownMiniCluster();
 +  }
 +
 +  private void init(String tableName) throws Exception {
 +    fs = FileSystem.get(conf);
 +    Path testDir = FSUtils.getRootDir(conf);
 +    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
 +    basePath = new Path(new Path(mobTestDir, tableName), family);
 +    mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
 +    delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
 +  }
 +
 +  @Test
 +  public void testCompactionSelectWithAllFiles() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactionSelectWithAllFiles";
 +    init(tableName);
 +    int count = 10;
 +    // create 10 mob files.
 +    createStoreFiles(basePath, family, qf, count, Type.Put);
 +    // create 10 del files
 +    createStoreFiles(basePath, family, qf, count, Type.Delete);
 +    listFiles();
 +    long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD;
 +    List<String> expectedStartKeys = new ArrayList<>();
 +    for(FileStatus file : mobFiles) {
 +      if(file.getLen() < mergeSize) {
 +        String fileName = file.getPath().getName();
 +        String startKey = fileName.substring(0, 32);
 +        expectedStartKeys.add(startKey);
 +      }
 +    }
 +    testSelectFiles(tableName, CompactionType.ALL_FILES, false, expectedStartKeys);
 +  }
 +
 +  @Test
 +  public void testCompactionSelectWithPartFiles() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactionSelectWithPartFiles";
 +    init(tableName);
 +    int count = 10;
 +    // create 10 mob files.
 +    createStoreFiles(basePath, family, qf, count, Type.Put);
 +    // create 10 del files
 +    createStoreFiles(basePath, family, qf, count, Type.Delete);
 +    listFiles();
 +    long mergeSize = 4000;
 +    List<String> expectedStartKeys = new ArrayList<>();
 +    for(FileStatus file : mobFiles) {
 +      if(file.getLen() < 4000) {
 +        String fileName = file.getPath().getName();
 +        String startKey = fileName.substring(0, 32);
 +        expectedStartKeys.add(startKey);
 +      }
 +    }
 +    // set the mob file compaction mergeable threshold
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
 +    testSelectFiles(tableName, CompactionType.PART_FILES, false, expectedStartKeys);
 +  }
 +
 +  @Test
 +  public void testCompactionSelectWithForceAllFiles() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactionSelectWithForceAllFiles";
 +    init(tableName);
 +    int count = 10;
 +    // create 10 mob files.
 +    createStoreFiles(basePath, family, qf, count, Type.Put);
 +    // create 10 del files
 +    createStoreFiles(basePath, family, qf, count, Type.Delete);
 +    listFiles();
 +    long mergeSize = 4000;
 +    List<String> expectedStartKeys = new ArrayList<>();
 +    for(FileStatus file : mobFiles) {
 +      String fileName = file.getPath().getName();
 +      String startKey = fileName.substring(0, 32);
 +      expectedStartKeys.add(startKey);
 +    }
 +    // set the mob file compaction mergeable threshold
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
 +    testSelectFiles(tableName, CompactionType.ALL_FILES, true, expectedStartKeys);
 +  }
 +
 +  @Test
 +  public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactDelFilesWithDefaultBatchSize";
 +    init(tableName);
 +    // create 20 mob files.
 +    createStoreFiles(basePath, family, qf, 20, Type.Put);
 +    // create 13 del files
 +    createStoreFiles(basePath, family, qf, 13, Type.Delete);
 +    listFiles();
 +    testCompactDelFiles(tableName, 1, 13, false);
 +  }
 +
 +  @Test
 +  public void testCompactDelFilesWithSmallBatchSize() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactDelFilesWithSmallBatchSize";
 +    init(tableName);
 +    // create 20 mob files.
 +    createStoreFiles(basePath, family, qf, 20, Type.Put);
 +    // create 13 del files
 +    createStoreFiles(basePath, family, qf, 13, Type.Delete);
 +    listFiles();
 +
 +    // set the mob file compaction batch size
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4);
 +    testCompactDelFiles(tableName, 1, 13, false);
 +  }
 +
 +  @Test
 +  public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
 +    resetConf();
 +    String tableName = "testCompactDelFilesWithSmallBatchSize";
 +    init(tableName);
 +    // create 20 mob files.
 +    createStoreFiles(basePath, family, qf, 20, Type.Put);
 +    // create 13 del files
 +    createStoreFiles(basePath, family, qf, 13, Type.Delete);
 +    listFiles();
 +
 +    // set the max del file count
 +    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5);
 +    // set the mob file compaction batch size
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2);
 +    testCompactDelFiles(tableName, 4, 13, false);
 +  }
 +
 +  /**
 +   * Tests the selectFiles
 +   * @param tableName the table name
 +   * @param type the expected compaction type
 +   * @param expected the expected start keys
 +   */
 +  private void testSelectFiles(String tableName, final CompactionType type,
 +    final boolean isForceAllFiles, final List<String> expected) throws IOException {
 +    PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
 +      TableName.valueOf(tableName), hcd, pool) {
 +      @Override
 +      public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
 +        throws IOException {
 +        if (files == null || files.isEmpty()) {
 +          return null;
 +        }
 +        PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles);
 +        // assert the compaction type
 +        Assert.assertEquals(type, request.type);
 +        // assert get the right partitions
 +        compareCompactedPartitions(expected, request.compactionPartitions);
 +        // assert get the right del files
 +        compareDelFiles(request.delFiles);
 +        return null;
 +      }
 +    };
 +    compactor.compact(allFiles, isForceAllFiles);
 +  }
 +
 +  /**
 +   * Tests the compacteDelFile
 +   * @param tableName the table name
 +   * @param expectedFileCount the expected file count
 +   * @param expectedCellCount the expected cell count
 +   */
 +  private void testCompactDelFiles(String tableName, final int expectedFileCount,
 +      final int expectedCellCount, boolean isForceAllFiles) throws IOException {
 +    PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
 +      TableName.valueOf(tableName), hcd, pool) {
 +      @Override
 +      protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
 +          throws IOException {
 +        List<Path> delFilePaths = new ArrayList<Path>();
 +        for (FileStatus delFile : request.delFiles) {
 +          delFilePaths.add(delFile.getPath());
 +        }
 +        List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
 +        // assert the del files are merged.
 +        Assert.assertEquals(expectedFileCount, newDelPaths.size());
 +        Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
 +        return null;
 +      }
 +    };
 +    compactor.compact(allFiles, isForceAllFiles);
 +  }
 +
 +  /**
 +   * Lists the files in the path
 +   */
 +  private void listFiles() throws IOException {
 +    for (FileStatus file : fs.listStatus(basePath)) {
 +      allFiles.add(file);
 +      if (file.getPath().getName().endsWith("_del")) {
 +        delFiles.add(file);
 +      } else {
 +        mobFiles.add(file);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Compares the compacted partitions.
 +   * @param partitions the collection of CompactedPartitions
 +   */
 +  private void compareCompactedPartitions(List<String> expected,
 +      Collection<CompactionPartition> partitions) {
 +    List<String> actualKeys = new ArrayList<>();
 +    for (CompactionPartition partition : partitions) {
 +      actualKeys.add(partition.getPartitionId().getStartKey());
 +    }
 +    Collections.sort(expected);
 +    Collections.sort(actualKeys);
 +    Assert.assertEquals(expected.size(), actualKeys.size());
 +    for (int i = 0; i < expected.size(); i++) {
 +      Assert.assertEquals(expected.get(i), actualKeys.get(i));
 +    }
 +  }
 +
 +  /**
 +   * Compares the del files.
 +   * @param allDelFiles all the del files
 +   */
 +  private void compareDelFiles(Collection<FileStatus> allDelFiles) {
 +    int i = 0;
 +    for (FileStatus file : allDelFiles) {
 +      Assert.assertEquals(delFiles.get(i), file);
 +      i++;
 +    }
 +  }
 +
 +  /**
 +   * Creates store files.
 +   * @param basePath the path to create file
 +   * @family the family name
 +   * @qualifier the column qualifier
 +   * @count the store file number
 +   * @type the key type
 +   */
 +  private void createStoreFiles(Path basePath, String family, String qualifier, int count,
 +      Type type) throws IOException {
 +    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
 +    String startKey = "row_";
 +    MobFileName mobFileName = null;
 +    for (int i = 0; i < count; i++) {
 +      byte[] startRow = Bytes.toBytes(startKey + i) ;
 +      if(type.equals(Type.Delete)) {
 +        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
 +            new Date()), delSuffix);
 +      }
 +      if(type.equals(Type.Put)){
 +        mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
 +            new Date()), mobSuffix);
 +      }
 +      StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs)
 +      .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
 +      writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
 +          type, (i+1)*1000);
 +    }
 +  }
 +
 +  /**
 +   * Writes data to store file.
 +   * @param writer the store file writer
 +   * @param row the row key
 +   * @param family the family name
 +   * @param qualifier the column qualifier
 +   * @param type the key type
 +   * @param size the size of value
 +   */
 +  private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family,
 +      byte[] qualifier, Type type, int size) throws IOException {
 +    long now = System.currentTimeMillis();
 +    try {
 +      byte[] dummyData = new byte[size];
 +      new Random().nextBytes(dummyData);
 +      writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
 +    } finally {
 +      writer.close();
 +    }
 +  }
 +
 +  /**
 +   * Gets the number of del cell in the del files
 +   * @param paths the del file paths
 +   * @return the cell size
 +   */
 +  private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
 +    List<StoreFile> sfs = new ArrayList<StoreFile>();
 +    int size = 0;
 +    for(Path path : paths) {
 +      StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
 +      sfs.add(sf);
 +    }
 +    List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
 +        false, null, HConstants.LATEST_TIMESTAMP);
 +    Scan scan = new Scan();
 +    scan.setMaxVersions(hcd.getMaxVersions());
 +    long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
 +    long ttl = HStore.determineTTLFromFamily(hcd);
-     ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
++    ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, CellComparator.COMPARATOR);
 +    StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null,
 +        scanners, 0L, HConstants.LATEST_TIMESTAMP);
 +    List<Cell> results = new ArrayList<>();
 +    boolean hasMore = true;
 +
 +    while (hasMore) {
 +      hasMore = scanner.next(results);
 +      size += results.size();
 +      results.clear();
 +    }
 +    scanner.close();
 +    return size;
 +  }
 +
 +  private static ExecutorService createThreadPool() {
 +    int maxThreads = 10;
 +    long keepAliveTime = 60;
 +    final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
 +    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
 +      TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
 +      new RejectedExecutionHandler() {
 +        @Override
 +        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 +          try {
 +            // waiting for a thread to pick up instead of throwing exceptions.
 +            queue.put(r);
 +          } catch (InterruptedException e) {
 +            throw new RejectedExecutionException(e);
 +          }
 +        }
 +      });
 +    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
 +    return pool;
 +  }
 +
 +  /**
 +   * Resets the configuration.
 +   */
 +  private void resetConf() {
 +    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
 +    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
 +    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
 +  }
 +}