You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2015/05/28 10:13:46 UTC

[5/6] hbase git commit: HBASE-13763 Handle the rename, annotation and typo stuff in MOB. (Jingcheng)

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
index 464a0e7..dd33cda 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -77,43 +77,43 @@ public class MobConstants {
   public final static String EMPTY_STRING = "";
   /**
    * If the size of a mob file is less than this value, it's regarded as a small file and needs to
-   * be merged in mob file compaction. The default value is 192MB.
+   * be merged in mob compaction. The default value is 192MB.
    */
-  public static final String MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD =
-    "hbase.mob.file.compaction.mergeable.threshold";
-  public static final long DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD = 192 * 1024 * 1024;
+  public static final String MOB_COMPACTION_MERGEABLE_THRESHOLD =
+    "hbase.mob.compaction.mergeable.threshold";
+  public static final long DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD = 192 * 1024 * 1024;
   /**
-   * The max number of del files that is allowed in the mob file compaction. In the mob file
+   * The max number of del files that is allowed in the mob file compaction. In the mob
    * compaction, when the number of existing del files is larger than this value, they are merged
    * until number of del files is not larger this value. The default value is 3.
    */
   public static final String MOB_DELFILE_MAX_COUNT = "hbase.mob.delfile.max.count";
   public static final int DEFAULT_MOB_DELFILE_MAX_COUNT = 3;
   /**
-   * The max number of the mob files that is allowed in a batch of the mob file compaction.
-   * The mob file compaction merges the small mob files to bigger ones. If the number of the
+   * The max number of the mob files that is allowed in a batch of the mob compaction.
+   * The mob compaction merges the small mob files to bigger ones. If the number of the
    * small files is very large, it could lead to a "too many opened file handlers" in the merge.
    * And the merge has to be split into batches. This value limits the number of mob files
-   * that are selected in a batch of the mob file compaction. The default value is 100.
+   * that are selected in a batch of the mob compaction. The default value is 100.
    */
-  public static final String MOB_FILE_COMPACTION_BATCH_SIZE =
-    "hbase.mob.file.compaction.batch.size";
-  public static final int DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE = 100;
+  public static final String MOB_COMPACTION_BATCH_SIZE =
+    "hbase.mob.compaction.batch.size";
+  public static final int DEFAULT_MOB_COMPACTION_BATCH_SIZE = 100;
   /**
-   * The period that MobFileCompactionChore runs. The unit is millisecond.
+   * The period that MobCompactionChore runs. The unit is second.
    * The default value is one week.
    */
-  public static final String MOB_FILE_COMPACTION_CHORE_PERIOD =
-    "hbase.mob.file.compaction.chore.period";
-  public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD =
+  public static final String MOB_COMPACTION_CHORE_PERIOD =
+    "hbase.mob.compaction.chore.period";
+  public static final int DEFAULT_MOB_COMPACTION_CHORE_PERIOD =
     24 * 60 * 60 * 7; // a week
-  public static final String MOB_FILE_COMPACTOR_CLASS_KEY = "hbase.mob.file.compactor.class";
+  public static final String MOB_COMPACTOR_CLASS_KEY = "hbase.mob.compactor.class";
   /**
-   * The max number of threads used in MobFileCompactor.
+   * The max number of threads used in MobCompactor.
    */
-  public static final String MOB_FILE_COMPACTION_THREADS_MAX =
-    "hbase.mob.file.compaction.threads.max";
-  public static final int DEFAULT_MOB_FILE_COMPACTION_THREADS_MAX = 1;
+  public static final String MOB_COMPACTION_THREADS_MAX =
+    "hbase.mob.compaction.threads.max";
+  public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1;
   private MobConstants() {
 
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
index 7d8c9a5..0780f87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
@@ -177,7 +177,7 @@ public class MobFileCache {
           evictedFileCount.incrementAndGet();
         }
       } catch (IOException e) {
-        LOG.error("Fail to evict the file " + fileName, e);
+        LOG.error("Failed to evict the file " + fileName, e);
       } finally {
         if (lockEntry != null) {
           keyLock.releaseLockEntry(lockEntry);
@@ -249,7 +249,7 @@ public class MobFileCache {
 
   public void shutdown() {
     this.scheduleThreadPool.shutdown();
-    for (int i = 0; i < 10; i++) {
+    for (int i = 0; i < 100; i++) {
       if (!this.scheduleThreadPool.isShutdown()) {
         try {
           Thread.sleep(10);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
index 937e965..796fe4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
@@ -26,9 +26,9 @@ import org.apache.hadoop.hbase.util.MD5Hash;
  * It consists of a md5 of a start key, a date and an uuid.
  * It looks like md5(start) + date + uuid.
  * <ol>
- * <li>0-31 characters: md5 hex string of a start key. Since the length of the start key is not
+ * <li>characters 0-31: md5 hex string of a start key. Since the length of the start key is not
  * fixed, have to use the md5 instead which has a fix length.</li>
- * <li>32-39 characters: a string of a date with format yyyymmdd. The date is the latest timestamp
+ * <li>characters 32-39: a string of a date with format yyyymmdd. The date is the latest timestamp
  * of cells in this file</li>
  * <li>the remaining characters: the uuid.</li>
  * </ol>

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
index 2d5f1ad..a54660c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
@@ -43,6 +43,6 @@ public class MobStoreEngine extends DefaultStoreEngine {
    */
   @Override
   protected void createCompactor(Configuration conf, Store store) throws IOException {
-    compactor = new DefaultMobCompactor(conf, store);
+    compactor = new DefaultMobStoreCompactor(conf, store);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index bbdc47a..c40767c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
@@ -63,8 +64,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
-import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor;
-import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor;
+import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -249,7 +250,7 @@ public class MobUtils {
     try {
       stats = fs.listStatus(path);
     } catch (FileNotFoundException e) {
-      LOG.warn("Fail to find the mob file " + path, e);
+      LOG.warn("Failed to find the mob file " + path, e);
     }
     if (null == stats) {
       // no file found
@@ -287,7 +288,7 @@ public class MobUtils {
             filesToClean);
         deletedFileCount = filesToClean.size();
       } catch (IOException e) {
-        LOG.error("Fail to delete the mob files " + filesToClean, e);
+        LOG.error("Failed to delete the mob files " + filesToClean, e);
       }
     }
     LOG.info(deletedFileCount + " expired mob files are deleted");
@@ -555,7 +556,7 @@ public class MobUtils {
   }
 
   /**
-   * Creates a writer for the del file in temp directory.
+   * Creates a writer for the mob file in temp directory.
    * @param conf The current configuration.
    * @param fs The current file system.
    * @param family The descriptor of the current column family.
@@ -631,7 +632,7 @@ public class MobUtils {
       storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
       storeFile.createReader();
     } catch (IOException e) {
-      LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
+      LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
       throw e;
     } finally {
       if (storeFile != null) {
@@ -692,22 +693,22 @@ public class MobUtils {
   }
 
   /**
-   * Performs the mob file compaction.
+   * Performs the mob compaction.
    * @param conf the Configuration
    * @param fs the file system
    * @param tableName the table the compact
    * @param hcd the column descriptor
    * @param pool the thread pool
    * @param tableLockManager the tableLock manager
-   * @param isForceAllFiles Whether add all mob files into the compaction.
+   * @param allFiles Whether add all mob files into the compaction.
    */
-  public static void doMobFileCompaction(Configuration conf, FileSystem fs, TableName tableName,
+  public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
     HColumnDescriptor hcd, ExecutorService pool, TableLockManager tableLockManager,
-    boolean isForceAllFiles) throws IOException {
-    String className = conf.get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY,
-      PartitionedMobFileCompactor.class.getName());
-    // instantiate the mob file compactor.
-    MobFileCompactor compactor = null;
+    boolean allFiles) throws IOException {
+    String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY,
+      PartitionedMobCompactor.class.getName());
+    // instantiate the mob compactor.
+    MobCompactor compactor = null;
     try {
       compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
         Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class,
@@ -724,21 +725,21 @@ public class MobUtils {
       // the tableLockManager might be null in testing. In that case, it is lock-free.
       if (tableLockManager != null) {
         lock = tableLockManager.writeLock(MobUtils.getTableLockName(tableName),
-          "Run MobFileCompaction");
+          "Run MobCompactor");
         lock.acquire();
       }
       tableLocked = true;
-      compactor.compact(isForceAllFiles);
+      compactor.compact(allFiles);
     } catch (Exception e) {
-      LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString()
+      LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString()
         + " in the table " + tableName.getNameAsString(), e);
     } finally {
       if (lock != null && tableLocked) {
         try {
           lock.release();
         } catch (IOException e) {
-          LOG.error("Fail to release the write lock for the table " + tableName.getNameAsString(),
-            e);
+          LOG.error(
+            "Failed to release the write lock for the table " + tableName.getNameAsString(), e);
         }
       }
     }
@@ -749,15 +750,15 @@ public class MobUtils {
    * @param conf the Configuration
    * @return A thread pool.
    */
-  public static ExecutorService createMobFileCompactorThreadPool(Configuration conf) {
-    int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_THREADS_MAX,
-      MobConstants.DEFAULT_MOB_FILE_COMPACTION_THREADS_MAX);
+  public static ExecutorService createMobCompactorThreadPool(Configuration conf) {
+    int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX,
+      MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX);
     if (maxThreads == 0) {
       maxThreads = 1;
     }
     final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue,
-      Threads.newDaemonThreadFactory("MobFileCompactor"), new RejectedExecutionHandler() {
+      Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() {
         @Override
         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
           try {
@@ -839,4 +840,19 @@ public class MobUtils {
     }
     return cryptoContext;
   }
+
+  /**
+   * Checks whether this table has mob-enabled columns.
+   * @param htd The current table descriptor.
+   * @return Whether this table has mob-enabled columns.
+   */
+  public static boolean hasMobColumns(HTableDescriptor htd) {
+    HColumnDescriptor[] hcds = htd.getColumnFamilies();
+    for (HColumnDescriptor hcd : hcds) {
+      if (hcd.isMobEnabled()) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java
new file mode 100644
index 0000000..5d162b4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.compactions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * The compaction request for mob files.
+ */
+@InterfaceAudience.Private
+public abstract class MobCompactionRequest {
+
+  protected long selectionTime;
+  protected CompactionType type = CompactionType.PART_FILES;
+
+  public void setCompactionType(CompactionType type) {
+    this.type = type;
+  }
+
+  /**
+   * Gets the selection time.
+   * @return The selection time.
+   */
+  public long getSelectionTime() {
+    return this.selectionTime;
+  }
+
+  /**
+   * Gets the compaction type.
+   * @return The compaction type.
+   */
+  public CompactionType getCompactionType() {
+    return type;
+  }
+
+  protected enum CompactionType {
+
+    /**
+     * Part of mob files are selected.
+     */
+    PART_FILES,
+
+    /**
+     * All of mob files are selected.
+     */
+    ALL_FILES;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java
new file mode 100644
index 0000000..156c6f6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.compactions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * A mob compactor to directly compact the mob files.
+ */
+@InterfaceAudience.Private
+public abstract class MobCompactor {
+
+  protected FileSystem fs;
+  protected Configuration conf;
+  protected TableName tableName;
+  protected HColumnDescriptor column;
+
+  protected Path mobTableDir;
+  protected Path mobFamilyDir;
+  protected ExecutorService pool;
+
+  public MobCompactor(Configuration conf, FileSystem fs, TableName tableName,
+    HColumnDescriptor column, ExecutorService pool) {
+    this.conf = conf;
+    this.fs = fs;
+    this.tableName = tableName;
+    this.column = column;
+    this.pool = pool;
+    mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
+    mobFamilyDir = MobUtils.getMobFamilyPath(conf, tableName, column.getNameAsString());
+  }
+
+  /**
+   * Compacts the mob files for the current column family.
+   * @return The paths of new mob files generated in the compaction.
+   * @throws IOException
+   */
+  public List<Path> compact() throws IOException {
+    return compact(false);
+  }
+
+  /**
+   * Compacts the mob files by compaction type for the current column family.
+   * @param allFiles Whether add all mob files into the compaction.
+   * @return The paths of new mob files generated in the compaction.
+   * @throws IOException
+   */
+  public List<Path> compact(boolean allFiles) throws IOException {
+    return compact(Arrays.asList(fs.listStatus(mobFamilyDir)), allFiles);
+  }
+
+  /**
+   * Compacts the candidate mob files.
+   * @param files The candidate mob files.
+   * @param allFiles Whether add all mob files into the compaction.
+   * @return The paths of new mob files generated in the compaction.
+   * @throws IOException
+   */
+  public abstract List<Path> compact(List<FileStatus> files, boolean allFiles)
+    throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
new file mode 100644
index 0000000..af1eb4a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
@@ -0,0 +1,146 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.compactions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * An implementation of {@link MobCompactionRequest} that is used in
+ * {@link PartitionedMobCompactor}.
+ * The mob files that have the same start key and date in their names belong to
+ * the same partition.
+ */
+@InterfaceAudience.Private
+public class PartitionedMobCompactionRequest extends MobCompactionRequest {
+
+  protected Collection<FileStatus> delFiles;
+  protected Collection<CompactionPartition> compactionPartitions;
+
+  public PartitionedMobCompactionRequest(Collection<CompactionPartition> compactionPartitions,
+    Collection<FileStatus> delFiles) {
+    this.selectionTime = EnvironmentEdgeManager.currentTime();
+    this.compactionPartitions = compactionPartitions;
+    this.delFiles = delFiles;
+  }
+
+  /**
+   * Gets the compaction partitions.
+   * @return The compaction partitions.
+   */
+  public Collection<CompactionPartition> getCompactionPartitions() {
+    return this.compactionPartitions;
+  }
+
+  /**
+   * Gets the del files.
+   * @return The del files.
+   */
+  public Collection<FileStatus> getDelFiles() {
+    return this.delFiles;
+  }
+
+  /**
+   * The partition in the mob compaction.
+   * The mob files that have the same start key and date in their names belong to
+   * the same partition.
+   */
+  protected static class CompactionPartition {
+    private List<FileStatus> files = new ArrayList<FileStatus>();
+    private CompactionPartitionId partitionId;
+
+    public CompactionPartition(CompactionPartitionId partitionId) {
+      this.partitionId = partitionId;
+    }
+
+    public CompactionPartitionId getPartitionId() {
+      return this.partitionId;
+    }
+
+    public void addFile(FileStatus file) {
+      files.add(file);
+    }
+
+    public List<FileStatus> listFiles() {
+      return Collections.unmodifiableList(files);
+    }
+  }
+
+  /**
+   * The partition id that consists of start key and date of the mob file name.
+   */
+  public static class CompactionPartitionId {
+
+    private String startKey;
+    private String date;
+
+    public CompactionPartitionId(String startKey, String date) {
+      if (startKey == null || date == null) {
+        throw new IllegalArgumentException("Neither of start key and date could be null");
+      }
+      this.startKey = startKey;
+      this.date = date;
+    }
+
+    public String getStartKey() {
+      return this.startKey;
+    }
+
+    public String getDate() {
+      return this.date;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = 17;
+      result = 31 * result + startKey.hashCode();
+      result = 31 * result + date.hashCode();
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof CompactionPartitionId)) {
+        return false;
+      }
+      CompactionPartitionId another = (CompactionPartitionId) obj;
+      if (!this.startKey.equals(another.startKey)) {
+        return false;
+      }
+      if (!this.date.equals(another.date)) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder(startKey).append(date).toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
new file mode 100644
index 0000000..065787e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -0,0 +1,655 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.compactions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * An implementation of {@link MobCompactor} that compacts the mob files in partitions.
+ */
+@InterfaceAudience.Private
+public class PartitionedMobCompactor extends MobCompactor {
+
+  private static final Log LOG = LogFactory.getLog(PartitionedMobCompactor.class);
+  protected long mergeableSize;
+  protected int delFileMaxCount;
+  /** The number of files compacted in a batch */
+  protected int compactionBatchSize;
+  protected int compactionKVMax;
+
+  private Path tempPath;
+  private Path bulkloadPath;
+  private CacheConfig compactionCacheConfig;
+  private Tag tableNameTag;
+  private Encryption.Context cryptoContext = Encryption.Context.NONE;
+
+  public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
+    HColumnDescriptor column, ExecutorService pool) throws IOException {
+    super(conf, fs, tableName, column, pool);
+    mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
+      MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
+    delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
+      MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
+    // default is 100
+    compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
+      MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
+    tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
+    bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
+      tableName.getNamespaceAsString(), tableName.getQualifierAsString())));
+    compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
+      HConstants.COMPACTION_KV_MAX_DEFAULT);
+    Configuration copyOfConf = new Configuration(conf);
+    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+    compactionCacheConfig = new CacheConfig(copyOfConf);
+    tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
+    cryptoContext = MobUtils.createEncryptionContext(copyOfConf, column);
+  }
+
+  @Override
+  public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException {
+    if (files == null || files.isEmpty()) {
+      LOG.info("No candidate mob files");
+      return null;
+    }
+    LOG.info("is allFiles: " + allFiles);
+    // find the files to compact.
+    PartitionedMobCompactionRequest request = select(files, allFiles);
+    // compact the files.
+    return performCompaction(request);
+  }
+
+  /**
+   * Selects the compacted mob/del files.
+   * Iterates the candidates to find out all the del files and small mob files.
+   * @param candidates All the candidates.
+   * @param allFiles Whether add all mob files into the compaction.
+   * @return A compaction request.
+   * @throws IOException
+   */
+  protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
+    boolean allFiles) throws IOException {
+    Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
+    Map<CompactionPartitionId, CompactionPartition> filesToCompact =
+      new HashMap<CompactionPartitionId, CompactionPartition>();
+    int selectedFileCount = 0;
+    int irrelevantFileCount = 0;
+    for (FileStatus file : candidates) {
+      if (!file.isFile()) {
+        irrelevantFileCount++;
+        continue;
+      }
+      // group the del files and small files.
+      FileStatus linkedFile = file;
+      if (HFileLink.isHFileLink(file.getPath())) {
+        HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
+        linkedFile = getLinkedFileStatus(link);
+        if (linkedFile == null) {
+          // If the linked file cannot be found, regard it as an irrelevantFileCount file
+          irrelevantFileCount++;
+          continue;
+        }
+      }
+      if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
+        allDelFiles.add(file);
+      } else if (allFiles || linkedFile.getLen() < mergeableSize) {
+        // add all files if allFiles is true,
+        // otherwise add the small files to the merge pool
+        MobFileName fileName = MobFileName.create(linkedFile.getPath().getName());
+        CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(),
+          fileName.getDate());
+        CompactionPartition compactionPartition = filesToCompact.get(id);
+        if (compactionPartition == null) {
+          compactionPartition = new CompactionPartition(id);
+          compactionPartition.addFile(file);
+          filesToCompact.put(id, compactionPartition);
+        } else {
+          compactionPartition.addFile(file);
+        }
+        selectedFileCount++;
+      }
+    }
+    PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest(
+      filesToCompact.values(), allDelFiles);
+    if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
+      // all the files are selected
+      request.setCompactionType(CompactionType.ALL_FILES);
+    }
+    LOG.info("The compaction type is " + request.getCompactionType() + ", the request has "
+      + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and "
+      + irrelevantFileCount + " irrelevant files");
+    return request;
+  }
+
+  /**
+   * Performs the compaction on the selected files.
+   * <ol>
+   * <li>Compacts the del files.</li>
+   * <li>Compacts the selected small mob files and all the del files.</li>
+   * <li>If all the candidates are selected, delete the del files.</li>
+   * </ol>
+   * @param request The compaction request.
+   * @return The paths of new mob files generated in the compaction.
+   * @throws IOException
+   */
+  protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
+    throws IOException {
+    // merge the del files
+    List<Path> delFilePaths = new ArrayList<Path>();
+    for (FileStatus delFile : request.delFiles) {
+      delFilePaths.add(delFile.getPath());
+    }
+    List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
+    List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
+    for (Path newDelPath : newDelPaths) {
+      StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
+      newDelFiles.add(sf);
+    }
+    LOG.info("After merging, there are " + newDelFiles.size() + " del files");
+    // compact the mob files by partitions.
+    List<Path> paths = compactMobFiles(request, newDelFiles);
+    LOG.info("After compaction, there are " + paths.size() + " mob files");
+    // archive the del files if all the mob files are selected.
+    if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
+      LOG.info("After a mob compaction with all files selected, archiving the del files "
+        + newDelPaths);
+      try {
+        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
+      } catch (IOException e) {
+        LOG.error("Failed to archive the del files " + newDelPaths, e);
+      }
+    }
+    return paths;
+  }
+
+  /**
+   * Compacts the selected small mob files and all the del files.
+   * @param request The compaction request.
+   * @param delFiles The del files.
+   * @return The paths of new mob files after compactions.
+   * @throws IOException
+   */
+  protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request,
+    final List<StoreFile> delFiles) throws IOException {
+    Collection<CompactionPartition> partitions = request.compactionPartitions;
+    if (partitions == null || partitions.isEmpty()) {
+      LOG.info("No partitions of mob files");
+      return Collections.emptyList();
+    }
+    List<Path> paths = new ArrayList<Path>();
+    Connection c = ConnectionFactory.createConnection(conf);
+    final Table table = c.getTable(tableName);
+    try {
+      Map<CompactionPartitionId, Future<List<Path>>> results =
+        new HashMap<CompactionPartitionId, Future<List<Path>>>();
+      // compact the mob files by partitions in parallel.
+      for (final CompactionPartition partition : partitions) {
+        results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
+          @Override
+          public List<Path> call() throws Exception {
+            LOG.info("Compacting mob files for partition " + partition.getPartitionId());
+            return compactMobFilePartition(request, partition, delFiles, table);
+          }
+        }));
+      }
+      // compact the partitions in parallel.
+      List<CompactionPartitionId> failedPartitions = new ArrayList<CompactionPartitionId>();
+      for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
+        try {
+          paths.addAll(result.getValue().get());
+        } catch (Exception e) {
+          // just log the error
+          LOG.error("Failed to compact the partition " + result.getKey(), e);
+          failedPartitions.add(result.getKey());
+        }
+      }
+      if (!failedPartitions.isEmpty()) {
+        // if any partition fails in the compaction, directly throw an exception.
+        throw new IOException("Failed to compact the partitions " + failedPartitions);
+      }
+    } finally {
+      try {
+        table.close();
+      } catch (IOException e) {
+        LOG.error("Failed to close the HTable", e);
+      }
+    }
+    return paths;
+  }
+
+  /**
+   * Compacts a partition of selected small mob files and all the del files.
+   * @param request The compaction request.
+   * @param partition A compaction partition.
+   * @param delFiles The del files.
+   * @param table The current table.
+   * @return The paths of new mob files after compactions.
+   * @throws IOException
+   */
+  private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request,
+    CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException {
+    List<Path> newFiles = new ArrayList<Path>();
+    List<FileStatus> files = partition.listFiles();
+    int offset = 0;
+    Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
+    Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
+    while (offset < files.size()) {
+      int batch = compactionBatchSize;
+      if (files.size() - offset < compactionBatchSize) {
+        batch = files.size() - offset;
+      }
+      if (batch == 1 && delFiles.isEmpty()) {
+        // only one file left and no del files, do not compact it,
+        // and directly add it to the new files.
+        newFiles.add(files.get(offset).getPath());
+        offset++;
+        continue;
+      }
+      // clean the bulkload directory to avoid loading old files.
+      fs.delete(bulkloadPathOfPartition, true);
+      // add the selected mob files and del files into filesToCompact
+      List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
+      for (int i = offset; i < batch + offset; i++) {
+        StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
+          BloomType.NONE);
+        filesToCompact.add(sf);
+      }
+      filesToCompact.addAll(delFiles);
+      // compact the mob files in a batch.
+      compactMobFilesInBatch(request, partition, table, filesToCompact, batch,
+        bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
+      // move to the next batch.
+      offset += batch;
+    }
+    LOG.info("Compaction is finished. The number of mob files is changed from " + files.size()
+      + " to " + newFiles.size());
+    return newFiles;
+  }
+
+  /**
+   * Compacts a partition of selected small mob files and all the del files in a batch.
+   * @param request The compaction request.
+   * @param partition A compaction partition.
+   * @param table The current table.
+   * @param filesToCompact The files to be compacted.
+   * @param batch The number of mob files to be compacted in a batch.
+   * @param bulkloadPathOfPartition The directory where the bulkload column of the current
+   *        partition is saved.
+   * @param bulkloadColumnPath The directory where the bulkload files of current partition
+   *        are saved.
+   * @param newFiles The paths of new mob files after compactions.
+   * @throws IOException
+   */
+  private void compactMobFilesInBatch(PartitionedMobCompactionRequest request,
+    CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch,
+    Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
+    throws IOException {
+    // open scanner to the selected mob files and del files.
+    StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
+    // the mob files to be compacted, not include the del files.
+    List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
+    // Pair(maxSeqId, cellsCount)
+    Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
+    // open writers for the mob files and new ref store files.
+    Writer writer = null;
+    Writer refFileWriter = null;
+    Path filePath = null;
+    Path refFilePath = null;
+    long mobCells = 0;
+    try {
+      writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
+        tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId()
+          .getStartKey(), compactionCacheConfig, cryptoContext);
+      filePath = writer.getPath();
+      byte[] fileName = Bytes.toBytes(filePath.getName());
+      // create a temp file and open a writer for it in the bulkloadPath
+      refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
+        .getSecond().longValue(), compactionCacheConfig, cryptoContext);
+      refFilePath = refFileWriter.getPath();
+      List<Cell> cells = new ArrayList<Cell>();
+      boolean hasMore = false;
+      ScannerContext scannerContext =
+              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+      do {
+        hasMore = scanner.next(cells, scannerContext);
+        for (Cell cell : cells) {
+          // write the mob cell to the mob file.
+          writer.append(cell);
+          // write the new reference cell to the store file.
+          KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
+          refFileWriter.append(reference);
+          mobCells++;
+        }
+        cells.clear();
+      } while (hasMore);
+    } finally {
+      // close the scanner.
+      scanner.close();
+      // append metadata to the mob file, and close the mob file writer.
+      closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
+      // append metadata and bulkload info to the ref mob file, and close the writer.
+      closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
+    }
+    if (mobCells > 0) {
+      // commit mob file
+      MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
+      // bulkload the ref file
+      bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName());
+      newFiles.add(new Path(mobFamilyDir, filePath.getName()));
+    } else {
+      // remove the new files
+      // the mob file is empty, delete it instead of committing.
+      deletePath(filePath);
+      // the ref file is empty, delete it instead of committing.
+      deletePath(refFilePath);
+    }
+    // archive the old mob files, do not archive the del files.
+    try {
+      MobUtils
+        .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
+    } catch (IOException e) {
+      LOG.error("Failed to archive the files " + mobFilesToCompact, e);
+    }
+  }
+
+  /**
+   * Compacts the del files in batches which avoids opening too many files.
+   * @param request The compaction request.
+   * @param delFilePaths
+   * @return The paths of new del files after merging or the original files if no merging
+   *         is necessary.
+   * @throws IOException
+   */
+  protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request,
+    List<Path> delFilePaths) throws IOException {
+    if (delFilePaths.size() <= delFileMaxCount) {
+      return delFilePaths;
+    }
+    // when there are more del files than the number that is allowed, merge it firstly.
+    int offset = 0;
+    List<Path> paths = new ArrayList<Path>();
+    while (offset < delFilePaths.size()) {
+      // get the batch
+      int batch = compactionBatchSize;
+      if (delFilePaths.size() - offset < compactionBatchSize) {
+        batch = delFilePaths.size() - offset;
+      }
+      List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>();
+      if (batch == 1) {
+        // only one file left, do not compact it, directly add it to the new files.
+        paths.add(delFilePaths.get(offset));
+        offset++;
+        continue;
+      }
+      for (int i = offset; i < batch + offset; i++) {
+        batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
+          BloomType.NONE));
+      }
+      // compact the del files in a batch.
+      paths.add(compactDelFilesInBatch(request, batchedDelFiles));
+      // move to the next batch.
+      offset += batch;
+    }
+    return compactDelFiles(request, paths);
+  }
+
+  /**
+   * Compacts the del file in a batch.
+   * @param request The compaction request.
+   * @param delFiles The del files.
+   * @return The path of new del file after merging.
+   * @throws IOException
+   */
+  private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request,
+    List<StoreFile> delFiles) throws IOException {
+    // create a scanner for the del files.
+    StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
+    Writer writer = null;
+    Path filePath = null;
+    try {
+      writer = MobUtils.createDelFileWriter(conf, fs, column,
+        MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
+        column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
+        cryptoContext);
+      filePath = writer.getPath();
+      List<Cell> cells = new ArrayList<Cell>();
+      boolean hasMore = false;
+      ScannerContext scannerContext =
+              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+      do {
+        hasMore = scanner.next(cells, scannerContext);
+        for (Cell cell : cells) {
+          writer.append(cell);
+        }
+        cells.clear();
+      } while (hasMore);
+    } finally {
+      scanner.close();
+      if (writer != null) {
+        try {
+          writer.close();
+        } catch (IOException e) {
+          LOG.error("Failed to close the writer of the file " + filePath, e);
+        }
+      }
+    }
+    // commit the new del file
+    Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
+    // archive the old del files
+    try {
+      MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
+    } catch (IOException e) {
+      LOG.error("Failed to archive the old del files " + delFiles, e);
+    }
+    return path;
+  }
+
+  /**
+   * Creates a store scanner.
+   * @param filesToCompact The files to be compacted.
+   * @param scanType The scan type.
+   * @return The store scanner.
+   * @throws IOException
+   */
+  private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
+    throws IOException {
+    List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
+      null, HConstants.LATEST_TIMESTAMP);
+    Scan scan = new Scan();
+    scan.setMaxVersions(column.getMaxVersions());
+    long ttl = HStore.determineTTLFromFamily(column);
+    ScanInfo scanInfo = new ScanInfo(column, ttl, 0, CellComparator.COMPARATOR);
+    StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
+      HConstants.LATEST_TIMESTAMP);
+    return scanner;
+  }
+
+  /**
+   * Bulkloads the current file.
+   * @param table The current table.
+   * @param bulkloadDirectory The path of bulkload directory.
+   * @param fileName The current file name.
+   * @throws IOException
+   */
+  private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName)
+    throws IOException {
+    // bulkload the ref file
+    try {
+      LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+      bulkload.doBulkLoad(bulkloadDirectory, (HTable)table);
+    } catch (Exception e) {
+      // delete the committed mob file
+      deletePath(new Path(mobFamilyDir, fileName));
+      throw new IOException(e);
+    } finally {
+      // delete the bulkload files in bulkloadPath
+      deletePath(bulkloadDirectory);
+    }
+  }
+
+  /**
+   * Closes the mob file writer.
+   * @param writer The mob file writer.
+   * @param maxSeqId Maximum sequence id.
+   * @param mobCellsCount The number of mob cells.
+   * @throws IOException
+   */
+  private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount)
+    throws IOException {
+    if (writer != null) {
+      writer.appendMetadata(maxSeqId, false, mobCellsCount);
+      try {
+        writer.close();
+      } catch (IOException e) {
+        LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
+      }
+    }
+  }
+
+  /**
+   * Closes the ref file writer.
+   * @param writer The ref file writer.
+   * @param maxSeqId Maximum sequence id.
+   * @param bulkloadTime The timestamp at which the bulk load file is created.
+   * @throws IOException
+   */
+  private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime)
+    throws IOException {
+    if (writer != null) {
+      writer.appendMetadata(maxSeqId, false);
+      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
+      try {
+        writer.close();
+      } catch (IOException e) {
+        LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
+      }
+    }
+  }
+
+  /**
+   * Gets the max seqId and number of cells of the store files.
+   * @param storeFiles The store files.
+   * @return The pair of the max seqId and number of cells of the store files.
+   * @throws IOException
+   */
+  private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException {
+    long maxSeqId = 0;
+    long maxKeyCount = 0;
+    for (StoreFile sf : storeFiles) {
+      // the readers will be closed later after the merge.
+      maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
+      byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
+      if (count != null) {
+        maxKeyCount += Bytes.toLong(count);
+      }
+    }
+    return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
+  }
+
+  /**
+   * Deletes a file.
+   * @param path The path of the file to be deleted.
+   */
+  private void deletePath(Path path) {
+    try {
+      if (path != null) {
+        fs.delete(path, true);
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to delete the file " + path, e);
+    }
+  }
+
+  private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
+    Path[] locations = link.getLocations();
+    for (Path location : locations) {
+      FileStatus file = getFileStatus(location);
+      if (file != null) {
+        return file;
+      }
+    }
+    return null;
+  }
+
+  private FileStatus getFileStatus(Path path) throws IOException {
+    try {
+      if (path != null) {
+        FileStatus file = fs.getFileStatus(path);
+        return file;
+      }
+    } catch (FileNotFoundException e) {
+      LOG.warn("The file " + path + " can not be found", e);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java
deleted file mode 100644
index 375ba8c..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mob.filecompactions;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * The compaction request for mob files.
- */
-@InterfaceAudience.Private
-public abstract class MobFileCompactionRequest {
-
-  protected long selectionTime;
-  protected CompactionType type = CompactionType.PART_FILES;
-
-  public void setCompactionType(CompactionType type) {
-    this.type = type;
-  }
-
-  /**
-   * Gets the selection time.
-   * @return The selection time.
-   */
-  public long getSelectionTime() {
-    return this.selectionTime;
-  }
-
-  /**
-   * Gets the compaction type.
-   * @return The compaction type.
-   */
-  public CompactionType getCompactionType() {
-    return type;
-  }
-
-  protected enum CompactionType {
-
-    /**
-     * Part of mob files are selected.
-     */
-    PART_FILES,
-
-    /**
-     * All of mob files are selected.
-     */
-    ALL_FILES;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java
deleted file mode 100644
index fcb39c5..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mob.filecompactions;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-/**
- * A mob file compactor to directly compact the mob files.
- */
-@InterfaceAudience.Private
-public abstract class MobFileCompactor {
-
-  protected FileSystem fs;
-  protected Configuration conf;
-  protected TableName tableName;
-  protected HColumnDescriptor column;
-
-  protected Path mobTableDir;
-  protected Path mobFamilyDir;
-  protected ExecutorService pool;
-
-  public MobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
-    HColumnDescriptor column, ExecutorService pool) {
-    this.conf = conf;
-    this.fs = fs;
-    this.tableName = tableName;
-    this.column = column;
-    this.pool = pool;
-    mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
-    mobFamilyDir = MobUtils.getMobFamilyPath(conf, tableName, column.getNameAsString());
-  }
-
-  /**
-   * Compacts the mob files for the current column family.
-   * @return The paths of new mob files generated in the compaction.
-   * @throws IOException
-   */
-  public List<Path> compact() throws IOException {
-    return compact(false);
-  }
-
-  /**
-   * Compacts the mob files by compaction type for the current column family.
-   * @param isForceAllFiles Whether add all mob files into the compaction.
-   * @return The paths of new mob files generated in the compaction.
-   * @throws IOException
-   */
-  public List<Path> compact(boolean isForceAllFiles) throws IOException {
-    return compact(Arrays.asList(fs.listStatus(mobFamilyDir)), isForceAllFiles);
-  }
-
-  /**
-   * Compacts the candidate mob files.
-   * @param files The candidate mob files.
-   * @param isForceAllFiles Whether add all mob files into the compaction.
-   * @return The paths of new mob files generated in the compaction.
-   * @throws IOException
-   */
-  public abstract List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
-    throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java
deleted file mode 100644
index d2ac1db..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mob.filecompactions;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-/**
- * An implementation of {@link MobFileCompactionRequest} that is used in
- * {@link PartitionedMobFileCompactor}.
- * The mob files that have the same start key and date in their names belong to
- * the same partition.
- */
-@InterfaceAudience.Private
-public class PartitionedMobFileCompactionRequest extends MobFileCompactionRequest {
-
-  protected Collection<FileStatus> delFiles;
-  protected Collection<CompactionPartition> compactionPartitions;
-
-  public PartitionedMobFileCompactionRequest(Collection<CompactionPartition> compactionPartitions,
-    Collection<FileStatus> delFiles) {
-    this.selectionTime = EnvironmentEdgeManager.currentTime();
-    this.compactionPartitions = compactionPartitions;
-    this.delFiles = delFiles;
-  }
-
-  /**
-   * Gets the compaction partitions.
-   * @return The compaction partitions.
-   */
-  public Collection<CompactionPartition> getCompactionPartitions() {
-    return this.compactionPartitions;
-  }
-
-  /**
-   * Gets the del files.
-   * @return The del files.
-   */
-  public Collection<FileStatus> getDelFiles() {
-    return this.delFiles;
-  }
-
-  /**
-   * The partition in the mob file compaction.
-   * The mob files that have the same start key and date in their names belong to
-   * the same partition.
-   */
-  protected static class CompactionPartition {
-    private List<FileStatus> files = new ArrayList<FileStatus>();
-    private CompactionPartitionId partitionId;
-
-    public CompactionPartition(CompactionPartitionId partitionId) {
-      this.partitionId = partitionId;
-    }
-
-    public CompactionPartitionId getPartitionId() {
-      return this.partitionId;
-    }
-
-    public void addFile(FileStatus file) {
-      files.add(file);
-    }
-
-    public List<FileStatus> listFiles() {
-      return Collections.unmodifiableList(files);
-    }
-  }
-
-  /**
-   * The partition id that consists of start key and date of the mob file name.
-   */
-  protected static class CompactionPartitionId {
-
-    private String startKey;
-    private String date;
-
-    public CompactionPartitionId(String startKey, String date) {
-      if (startKey == null || date == null) {
-        throw new IllegalArgumentException("Neither of start key and date could be null");
-      }
-      this.startKey = startKey;
-      this.date = date;
-    }
-
-    public String getStartKey() {
-      return this.startKey;
-    }
-
-    public String getDate() {
-      return this.date;
-    }
-
-    @Override
-    public int hashCode() {
-      int result = 17;
-      result = 31 * result + startKey.hashCode();
-      result = 31 * result + date.hashCode();
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (!(obj instanceof CompactionPartitionId)) {
-        return false;
-      }
-      CompactionPartitionId another = (CompactionPartitionId) obj;
-      if (!this.startKey.equals(another.startKey)) {
-        return false;
-      }
-      if (!this.date.equals(another.date)) {
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public String toString() {
-      return new StringBuilder(startKey).append(date).toString();
-    }
-  }
-}