You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/09/18 16:04:39 UTC

git commit: HBASE-11646 Handle the MOB in compaction (Jingcheng Du)

Repository: hbase
Updated Branches:
  refs/heads/hbase-11339 7cd71d1db -> 7dbd828a9


HBASE-11646 Handle the MOB in compaction (Jingcheng Du)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7dbd828a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7dbd828a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7dbd828a

Branch: refs/heads/hbase-11339
Commit: 7dbd828a90e36c3b94fcadf659ce047a1ddbce0e
Parents: 7cd71d1
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Thu Sep 18 07:03:21 2014 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Thu Sep 18 07:03:21 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/mob/DefaultMobCompactor.java   | 233 +++++++++++++
 .../apache/hadoop/hbase/mob/MobStoreEngine.java |   8 +
 .../org/apache/hadoop/hbase/mob/MobUtils.java   |  40 +++
 .../hadoop/hbase/regionserver/HMobStore.java    |   5 +-
 .../regionserver/compactions/Compactor.java     |  49 ++-
 .../compactions/DefaultCompactor.java           |  23 +-
 .../compactions/StripeCompactor.java            |   3 +-
 .../hbase/regionserver/TestMobCompaction.java   | 332 +++++++++++++++++++
 8 files changed, 678 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
new file mode 100644
index 0000000..5f13502
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.regionserver.HMobStore;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Compact passed set of files in the mob-enabled column family.
+ */
+@InterfaceAudience.Private
+public class DefaultMobCompactor extends DefaultCompactor {
+
+  private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class);
+  private long mobSizeThreshold;
+  private HMobStore mobStore;
+  public DefaultMobCompactor(Configuration conf, Store store) {
+    super(conf, store);
+    // The mob cells reside in the mob-enabled column family which is held by HMobStore.
+    // During the compaction, the compactor reads the cells from the mob files and
+    // probably creates new mob files. All of these operations are included in HMobStore,
+    // so we need to cast the Store to HMobStore.
+    if (!(store instanceof HMobStore)) {
+      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
+    }
+    mobStore = (HMobStore) store;
+    mobSizeThreshold = MobUtils.getMobThreshold(store.getFamily());
+  }
+
+  /**
+   * Creates a writer for a new file in a temporary directory.
+   * @param fd The file details.
+   * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
+   * @return Writer for a new StoreFile in the tmp dir.
+   * @throws IOException
+   */
+  @Override
+  protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
+    // make this writer with tags always because of possible new cells with tags.
+    StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
+        true, fd.maxMVCCReadpoint >= smallestReadPoint, true);
+    return writer;
+  }
+
+  /**
+   * Performs compaction on a column family with the mob flag enabled.
+   * This is for when the mob threshold size has changed or if the mob
+   * column family mode has been toggled via an alter table statement.
+   * Compacts the files by the following rules.
+   * 1. If the cell has a mob reference tag, the cell's value is the path of the mob file.
+   * <ol>
+   * <li>
+   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
+   * directly copy the (with mob tag) cell into the new store file.
+   * </li>
+   * <li>
+   * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
+   * the new store file.
+   * </li>
+   * </ol>
+   * 2. If the cell doesn't have a reference tag.
+   * <ol>
+   * <li>
+   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
+   * write this cell to a mob file, and write the path of this mob file to the store file.
+   * </li>
+   * <li>
+   * Otherwise, directly write this cell into the store file.
+   * </li>
+   * </ol>
+   * @param fd File details
+   * @param scanner Where to read from.
+   * @param writer Where to write to.
+   * @param smallestReadPoint Smallest read point.
+   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
+   * @param major Is a major compaction.
+   * @return Whether compaction ended; false if it was interrupted for any reason.
+   */
+  @Override
+  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
+      long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
+    int bytesWritten = 0;
+    // Since scanner.next() can return 'false' but still be delivering data,
+    // we have to use a do/while loop.
+    List<Cell> cells = new ArrayList<Cell>();
+    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+    int closeCheckInterval = HStore.getCloseCheckInterval();
+    boolean hasMore;
+    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
+    byte[] fileName = null;
+    StoreFile.Writer mobFileWriter = null;
+    long mobCells = 0;
+    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
+            .getName());
+    try {
+      try {
+        // If the mob file writer could not be created, directly write the cell to the store file.
+        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
+            store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
+        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
+      } catch (IOException e) {
+        LOG.error(
+            "Fail to create mob writer, "
+                + "we will continue the compaction by writing MOB cells directly in store files",
+            e);
+      }
+      do {
+        hasMore = scanner.next(cells, compactionKVMax);
+        // output to writer:
+        for (Cell c : cells) {
+          // TODO remove the KeyValueUtil.ensureKeyValue before merging back to trunk.
+          KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+          resetSeqId(smallestReadPoint, cleanSeqId, kv);
+          if (mobFileWriter == null || kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
+            // If the mob file writer is null or the kv type is not put, directly write the cell
+            // to the store file.
+            writer.append(kv);
+          } else if (MobUtils.isMobReferenceCell(kv)) {
+            if (MobUtils.isValidMobRefCellValue(kv)) {
+              int size = MobUtils.getMobValueLength(kv);
+              if (size > mobSizeThreshold) {
+                // If the value size is larger than the threshold, it's regarded as a mob. Since
+                // its value is already in the mob file, directly write this cell to the store file
+                writer.append(kv);
+              } else {
+                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
+                // the mob cell from the mob file, and write it back to the store file.
+                Cell cell = mobStore.resolve(kv, false);
+                if (cell.getValueLength() != 0) {
+                  // put the mob data back to the store file
+                  KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell);
+                  mobKv.setSequenceId(kv.getSequenceId());
+                  writer.append(mobKv);
+                } else {
+                  // If the value of a file is empty, there might be issues when retrieving,
+                  // directly write the cell to the store file, and leave it to be handled by the
+                  // next compaction.
+                  writer.append(kv);
+                }
+              }
+            } else {
+              LOG.warn("The value format of the KeyValue " + kv
+                  + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
+              writer.append(kv);
+            }
+          } else if (kv.getValueLength() <= mobSizeThreshold) {
+            // If the value size of a cell is not larger than the threshold, directly write it to
+            // the store file.
+            writer.append(kv);
+          } else {
+            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
+            // write this cell to a mob file, and write the path to the store file.
+            mobCells++;
+            // append the original keyValue in the mob file.
+            mobFileWriter.append(kv);
+            KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
+            // write the cell whose value is the path of a mob file to the store file.
+            writer.append(reference);
+          }
+          ++progress.currentCompactedKVs;
+
+          // check periodically to see if a system stop is requested
+          if (closeCheckInterval > 0) {
+            bytesWritten += kv.getLength();
+            if (bytesWritten > closeCheckInterval) {
+              bytesWritten = 0;
+              if (!store.areWritesEnabled()) {
+                progress.cancel();
+                return false;
+              }
+            }
+          }
+        }
+        cells.clear();
+      } while (hasMore);
+    } finally {
+      if (mobFileWriter != null) {
+        appendMetadataAndCloseWriter(mobFileWriter, fd, major);
+      }
+    }
+    if(mobFileWriter!=null) {
+      if (mobCells > 0) {
+        // If the mob file is not empty, commit it.
+        mobStore.commitFile(mobFileWriter.getPath(), path);
+      } else {
+        try {
+          // If the mob file is empty, delete it instead of committing.
+          store.getFileSystem().delete(mobFileWriter.getPath(), true);
+        } catch (IOException e) {
+          LOG.error("Fail to delete the temp mob file", e);
+        }
+      }
+    }
+    progress.complete();
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
index d5e6f2e..2d5f1ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
@@ -37,4 +37,12 @@ public class MobStoreEngine extends DefaultStoreEngine {
     // specific compactor and policy when that is implemented.
     storeFlusher = new DefaultMobStoreFlusher(conf, store);
   }
+
+  /**
+   * Creates the DefaultMobCompactor.
+   */
+  @Override
+  protected void createCompactor(Configuration conf, Store store) throws IOException {
+    compactor = new DefaultMobCompactor(conf, store);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index c106e0b..e52d336 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -260,4 +260,44 @@ public class MobUtils {
     reference.setSequenceId(kv.getSequenceId());
     return reference;
   }
+
+  /**
+   * Indicates whether the current mob ref cell has a valid value.
+   * A mob ref cell has a mob reference tag.
+   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
+   * The real mob value length takes 4 bytes.
+   * The remaining part is the mob file name.
+   * @param cell The mob ref cell.
+   * @return True if the cell has a valid value.
+   */
+  public static boolean isValidMobRefCellValue(Cell cell) {
+    return cell.getValueLength() > Bytes.SIZEOF_INT;
+  }
+
+  /**
+   * Gets the mob value length from the mob ref cell.
+   * A mob ref cell has a mob reference tag.
+   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
+   * The real mob value length takes 4 bytes.
+   * The remaining part is the mob file name.
+   * @param cell The mob ref cell.
+   * @return The real mob value length.
+   */
+  public static int getMobValueLength(Cell cell) {
+    return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT);
+  }
+
+  /**
+   * Gets the mob file name from the mob ref cell.
+   * A mob ref cell has a mob reference tag.
+   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
+   * The real mob value length takes 4 bytes.
+   * The remaining part is the mob file name.
+   * @param cell The mob ref cell.
+   * @return The mob file name.
+   */
+  public static String getMobFileName(Cell cell) {
+    return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
+        cell.getValueLength() - Bytes.SIZEOF_INT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 17d3802..9c6f34e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -219,9 +219,8 @@ public class HMobStore extends HStore {
    */
   public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
     Cell result = null;
-    if (reference.getValueLength() > Bytes.SIZEOF_INT) {
-      String fileName = Bytes.toString(reference.getValueArray(), reference.getValueOffset()
-          + Bytes.SIZEOF_INT, reference.getValueLength() - Bytes.SIZEOF_INT);
+    if (MobUtils.isValidMobRefCellValue(reference)) {
+      String fileName = MobUtils.getMobFileName(reference);
       Path targetPath = new Path(mobFamilyPath, fileName);
       MobFile file = null;
       try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 2b053a6..13967c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -43,7 +43,9 @@ import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -57,7 +59,7 @@ public abstract class Compactor {
   protected Configuration conf;
   protected Store store;
 
-  private int compactionKVMax;
+  protected int compactionKVMax;
   protected Compression.Algorithm compactionCompression;
   
   /** specify how many days to keep MVCC values during major compaction **/ 
@@ -92,6 +94,8 @@ public abstract class Compactor {
     public long maxKeyCount = 0;
     /** Earliest put timestamp if major compaction */
     public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+    /** Latest put timestamp */
+    public long latestPutTs = HConstants.LATEST_TIMESTAMP;
     /** The last key in the files we're compacting. */
     public long maxSeqId = 0;
     /** Latest memstore read point found in any of the involved files */
@@ -158,6 +162,14 @@ public abstract class Compactor {
           fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
         }
       }
+      tmp = fileInfo.get(StoreFile.TIMERANGE_KEY);
+      TimeRangeTracker trt = new TimeRangeTracker();
+      if (tmp == null) {
+        fd.latestPutTs = HConstants.LATEST_TIMESTAMP;
+      } else {
+        Writables.copyWritable(tmp, trt);
+        fd.latestPutTs = trt.getMaximumTimestamp();
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Compacting " + file +
           ", keycount=" + keyCount +
@@ -216,14 +228,16 @@ public abstract class Compactor {
 
   /**
    * Performs the compaction.
+   * @param fd File details
    * @param scanner Where to read from.
    * @param writer Where to write to.
    * @param smallestReadPoint Smallest read point.
    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
+   * @param major Is a major compaction.
    * @return Whether compaction ended; false if it was interrupted for some reason.
    */
-  protected boolean performCompaction(InternalScanner scanner,
-      CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
+  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
+      long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
     int bytesWritten = 0;
     // Since scanner.next() can return 'false' but still be delivering data,
     // we have to use a do/while loop.
@@ -241,9 +255,7 @@ public abstract class Compactor {
       // output to writer:
       for (Cell c : kvs) {
         KeyValue kv = KeyValueUtil.ensureKeyValue(c);
-        if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
-          CellUtil.setSequenceId(kv, 0);
-        }
+        resetSeqId(smallestReadPoint, cleanSeqId, kv);
         writer.append(kv);
         ++progress.currentCompactedKVs;
         progress.totalCompactedSize += kv.getLength();
@@ -309,4 +321,29 @@ public abstract class Compactor {
     return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
   }
+
+  /**
+   * Resets the sequence id.
+   * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
+   * @param cleanSeqId Should clean the sequence id.
+   * @param kv The current KeyValue.
+   */
+  protected void resetSeqId(long smallestReadPoint, boolean cleanSeqId, KeyValue kv) {
+    if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
+      kv.setSequenceId(0);
+    }
+  }
+
+  /**
+   * Appends the metadata and closes the writer.
+   * @param writer The current store writer.
+   * @param fd The file details.
+   * @param isMajor Is a major compaction.
+   * @throws IOException
+   */
+  protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
+      boolean isMajor) throws IOException {
+    writer.appendMetadata(fd.maxSeqId, isMajor);
+    writer.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index d5b2b63..8056dd0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -76,9 +76,9 @@ public class DefaultCompactor extends Compactor {
           smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
           cleanSeqId = true;
         }
-        writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
-            fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
-        boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
+        writer = createTmpWriter(fd, smallestReadPoint);
+        boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
+            request.isAllFiles());
         if (!finished) {
           writer.close();
           store.getFileSystem().delete(writer.getPath(), false);
@@ -94,8 +94,7 @@ public class DefaultCompactor extends Compactor {
       }
     } finally {
       if (writer != null) {
-        writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
-        writer.close();
+        appendMetadataAndCloseWriter(writer, fd, request.isAllFiles());
         newFiles.add(writer.getPath());
       }
     }
@@ -103,6 +102,20 @@ public class DefaultCompactor extends Compactor {
   }
 
   /**
+   * Creates a writer for a new file in a temporary directory.
+   * @param fd The file details.
+   * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
+   * @return Writer for a new StoreFile in the tmp dir.
+   * @throws IOException
+   */
+  protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint)
+      throws IOException {
+    StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
+        true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
+    return writer;
+  }
+
+  /**
    * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
    * {@link #compact(CompactionRequest)};
    * @param filesToCompact the files to compact. These are used as the compactionSelection for

http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 487ff46..3109015 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -127,7 +127,8 @@ public class StripeCompactor extends Compactor {
       // It is ok here if storeScanner is null.
       StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
       mw.init(storeScanner, factory, store.getComparator());
-      finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId);
+      finished = performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId,
+          request.isMajor());
       if (!finished) {
         throw new InterruptedIOException( "Aborting compaction of store " + store +
             " in region " + store.getRegionInfo().getRegionNameAsString() +

http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
new file mode 100644
index 0000000..f8d6ce4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
@@ -0,0 +1,332 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Test mob compaction
+ */
+@Category(MediumTests.class)
+public class TestMobCompaction {
+  @Rule
+  public TestName name = new TestName();
+  static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName());
+  private HBaseTestingUtility UTIL = null;
+  private Configuration conf = null;
+
+  private HRegion region = null;
+  private HTableDescriptor htd = null;
+  private HColumnDescriptor hcd = null;
+  private long mobCellThreshold = 1000;
+
+  private FileSystem fs;
+
+  private static final byte[] COLUMN_FAMILY = fam1;
+  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
+  private int compactionThreshold;
+
+  private void init(long mobThreshold) throws Exception {
+    this.mobCellThreshold = mobThreshold;
+
+    UTIL = HBaseTestingUtility.createLocalHTU();
+
+    conf = UTIL.getConfiguration();
+    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
+
+    htd = UTIL.createTableDescriptor(name.getMethodName());
+    hcd = new HColumnDescriptor(COLUMN_FAMILY);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(mobThreshold));
+    hcd.setMaxVersions(1);
+    htd.addFamily(hcd);
+
+    region = UTIL.createLocalHRegion(htd, null, null);
+    fs = FileSystem.get(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    region.close();
+    fs.delete(UTIL.getDataTestDir(), true);
+  }
+
+  /**
+   * During compaction, cells smaller than the threshold won't be affected.
+   */
+  @Test
+  public void testSmallerValue() throws Exception {
+    init(500);
+    byte[] dummyData = makeDummyData(300); // smaller than mob threshold
+    HRegionIncommon loader = new HRegionIncommon(region);
+    // one hfile per row
+    for (int i = 0; i < compactionThreshold; i++) {
+      Put p = createPut(i, dummyData);
+      loader.put(p);
+      loader.flushcache();
+    }
+    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
+    assertEquals("Before compaction: mob file count", 0, countMobFiles());
+    assertEquals("Before compaction: rows", compactionThreshold, countRows());
+    assertEquals("Before compaction: mob rows", 0, countMobRows());
+
+    region.compactStores();
+
+    assertEquals("After compaction: store files", 1, countStoreFiles());
+    assertEquals("After compaction: mob file count", 0, countMobFiles());
+    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
+    assertEquals("After compaction: rows", compactionThreshold, countRows());
+    assertEquals("After compaction: mob rows", 0, countMobRows());
+  }
+
+  /**
+   * During compaction, the mob threshold size is changed.
+   */
+  @Test
+  public void testLargerValue() throws Exception {
+    init(200);
+    byte[] dummyData = makeDummyData(300); // larger than mob threshold
+    HRegionIncommon loader = new HRegionIncommon(region);
+    for (int i = 0; i < compactionThreshold; i++) {
+      Put p = createPut(i, dummyData);
+      loader.put(p);
+      loader.flushcache();
+    }
+    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
+    assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
+    assertEquals("Before compaction: rows", compactionThreshold, countRows());
+    assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
+    // Change the threshold larger than the data size
+    region.getTableDesc().getFamily(COLUMN_FAMILY).setValue(
+        MobConstants.MOB_THRESHOLD, Bytes.toBytes(500L));
+    region.initialize();
+    region.compactStores(true);
+    assertEquals("After compaction: store files", 1, countStoreFiles());
+    assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
+    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
+    assertEquals("After compaction: rows", compactionThreshold, countRows());
+    assertEquals("After compaction: mob rows", 0, countMobRows());
+  }
+
+  /**
+   * This test will first generate store files, then bulk load them and trigger the compaction. When
+   * compaction, the cell value will be larger than the threshold.
+   */
+  @Test
+  public void testMobCompactionWithBulkload() throws Exception {
+    // The following will produce store files of 600.
+    init(300);
+    byte[] dummyData = makeDummyData(600);
+
+    Path hbaseRootDir = FSUtils.getRootDir(conf);
+    Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
+    List<Pair<byte[], String>> hfiles = new ArrayList<Pair<byte[], String>>(1);
+    for (int i = 0; i < compactionThreshold; i++) {
+      Path hpath = new Path(basedir, "hfile" + i);
+      hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
+      createHFile(hpath, i, dummyData);
+    }
+
+    // The following will bulk load the above generated store files and compact, with 600(fileSize)
+    // > 300(threshold)
+    boolean result = region.bulkLoadHFiles(hfiles, true);
+    assertTrue("Bulkload result:", result);
+    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
+    assertEquals("Before compaction: mob file count", 0, countMobFiles());
+    assertEquals("Before compaction: rows", compactionThreshold, countRows());
+    assertEquals("Before compaction: mob rows", 0, countMobRows());
+    assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
+
+    region.compactStores();
+
+    assertEquals("After compaction: store files", 1, countStoreFiles());
+    assertEquals("After compaction: mob file count:", 1, countMobFiles());
+    assertEquals("After compaction: rows", compactionThreshold, countRows());
+    assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
+    assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
+  }
+
+  private int countStoreFiles() throws IOException {
+    Store store = region.getStore(COLUMN_FAMILY);
+    return store.getStorefilesCount();
+  }
+
+  private int countMobFiles() throws IOException {
+    Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
+        hcd.getNameAsString());
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
+      return files.length;
+    }
+    return 0;
+  }
+
+  private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
+    Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
+    p.setDurability(Durability.SKIP_WAL);
+    p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
+    return p;
+  }
+
+  /**
+   * Create an HFile with the given number of bytes
+   */
+  private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
+    HFileContext meta = new HFileContextBuilder().build();
+    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
+        .withFileContext(meta).create();
+    long now = System.currentTimeMillis();
+    try {
+      KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
+          Bytes.toBytes("colX"), now, dummyData);
+      writer.append(kv);
+    } finally {
+      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
+      writer.close();
+    }
+  }
+
+  private int countMobRows() throws IOException {
+    Scan scan = new Scan();
+    // Do not retrieve the mob data when scanning
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    InternalScanner scanner = region.getScanner(scan);
+
+    int scannedCount = 0;
+    List<Cell> results = new ArrayList<Cell>();
+    boolean hasMore = scanner.next(results);
+    while (hasMore) {
+      for (Cell c : results) {
+        if (MobUtils.isMobReferenceCell(c)) {
+          scannedCount++;
+        }
+      }
+      hasMore = scanner.next(results);
+    }
+    scanner.close();
+
+    return scannedCount;
+  }
+
+  private int countRows() throws IOException {
+    Scan scan = new Scan();
+    // Do not retrieve the mob data when scanning
+    InternalScanner scanner = region.getScanner(scan);
+
+    int scannedCount = 0;
+    List<Cell> results = new ArrayList<Cell>();
+    boolean hasMore = scanner.next(results);
+    while (hasMore) {
+      scannedCount += results.size();
+      hasMore = scanner.next(results);
+    }
+    scanner.close();
+
+    return scannedCount;
+  }
+
+  private byte[] makeDummyData(int size) {
+    byte[] dummyData = new byte[size];
+    new Random().nextBytes(dummyData);
+    return dummyData;
+  }
+
+  private int countReferencedMobFiles() throws IOException {
+    Scan scan = new Scan();
+    // Do not retrieve the mob data when scanning
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    InternalScanner scanner = region.getScanner(scan);
+
+    List<Cell> kvs = new ArrayList<Cell>();
+    boolean hasMore = true;
+    String fileName;
+    Set<String> files = new HashSet<String>();
+    do {
+      kvs.clear();
+      hasMore = scanner.next(kvs);
+      for (Cell c : kvs) {
+        KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+        if (!MobUtils.isMobReferenceCell(kv)) {
+          continue;
+        }
+        if (!MobUtils.isValidMobRefCellValue(kv)) {
+          continue;
+        }
+        int size = MobUtils.getMobValueLength(kv);
+        if (size <= mobCellThreshold) {
+          continue;
+        }
+        fileName = MobUtils.getMobFileName(kv);
+        if (fileName.isEmpty()) {
+          continue;
+        }
+        files.add(fileName);
+        Path familyPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(),
+            hcd.getNameAsString());
+        assertTrue(fs.exists(new Path(familyPath, fileName)));
+      }
+    } while (hasMore);
+
+    scanner.close();
+
+    return files.size();
+  }
+}