You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by es...@apache.org on 2017/11/01 07:29:35 UTC

[1/2] hbase git commit: HBASE-16417: In-memory MemStore Policy for Flattening and Compactions

Repository: hbase
Updated Branches:
  refs/heads/master d7cf88947 -> 17e7aff37


http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 64ee8dc..4a414d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -1177,7 +1178,8 @@ public class TestHStore {
   }
 
   @Test
-  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException {
+  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
+      InterruptedException {
     final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
     final int expectedSize = 2;
     testFlushBeforeCompletingScan(new MyListHook() {
@@ -1364,7 +1366,8 @@ public class TestHStore {
     myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
     long snapshotId = id++;
     // push older data into snapshot -- phase (1/4)
-    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
+    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
+        .DUMMY);
     storeFlushCtx.prepare();
 
     // insert current data into active -- phase (2/4)
@@ -1464,7 +1467,7 @@ public class TestHStore {
     conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
     conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
     // Set the lower threshold to invoke the "MERGE" policy
-    conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
+    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
     init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
         .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
     byte[] value = Bytes.toBytes("thisisavarylargevalue");
@@ -1551,8 +1554,8 @@ public class TestHStore {
   private class MyStore extends HStore {
     private final MyStoreHook hook;
 
-    MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration confParam,
-        MyStoreHook hook, boolean switchToPread) throws IOException {
+    MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
+        confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
       super(region, family, confParam);
       this.hook = hook;
     }
@@ -1669,7 +1672,8 @@ public class TestHStore {
   private static class MyMemStoreCompactor extends MemStoreCompactor {
     private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
     private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
-    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy compactionPolicy) {
+    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
+        compactionPolicy) throws IllegalArgumentIOException {
       super(compactingMemStore, compactionPolicy);
     }
 
@@ -1697,7 +1701,8 @@ public class TestHStore {
     }
 
     @Override
-    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
+    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
+        throws IllegalArgumentIOException {
       return new MyMemStoreCompactor(this, compactionPolicy);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
index e559b48..2c9a437 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
@@ -141,7 +141,7 @@ public class TestRecoveredEdits {
     // Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if
     // we flush at 1MB, that there are at least 3 flushed files that are there because of the
     // replay of edits.
-    if(policy == MemoryCompactionPolicy.EAGER) {
+    if(policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) {
       assertTrue("Files count=" + storeFiles.size(), storeFiles.size() >= 1);
     } else {
       assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10);

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index a012d09..6a64796 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -775,7 +775,7 @@ public class TestWalAndCompactingMemStoreFlush {
     conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(MemoryCompactionPolicy.BASIC));
     // length of pipeline that requires merge
-    conf.setInt(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
+    conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
 
     // Intialize the HRegion
     HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);


[2/2] hbase git commit: HBASE-16417: In-memory MemStore Policy for Flattening and Compactions

Posted by es...@apache.org.
HBASE-16417: In-memory MemStore Policy for Flattening and Compactions


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/17e7aff3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/17e7aff3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/17e7aff3

Branch: refs/heads/master
Commit: 17e7aff37e6b69cae0fb6d15ebeb2037d1ca6acc
Parents: d7cf889
Author: eshcar <es...@yahoo-inc.com>
Authored: Wed Nov 1 09:24:36 2017 +0200
Committer: eshcar <es...@yahoo-inc.com>
Committed: Wed Nov 1 09:26:00 2017 +0200

----------------------------------------------------------------------
 .../hadoop/hbase/MemoryCompactionPolicy.java    |   8 +-
 .../java/org/apache/hadoop/hbase/CellUtil.java  |  29 ++++
 .../org/apache/hadoop/hbase/util/ClassSize.java |   3 +-
 .../AdaptiveMemStoreCompactionStrategy.java     | 112 ++++++++++++++
 .../BasicMemStoreCompactionStrategy.java        |  43 ++++++
 .../regionserver/CellArrayImmutableSegment.java |  57 ++++++-
 .../regionserver/CellChunkImmutableSegment.java |  58 +++++--
 .../hadoop/hbase/regionserver/CellChunkMap.java |   1 -
 .../hadoop/hbase/regionserver/CellSet.java      |  20 ++-
 .../hbase/regionserver/CompactingMemStore.java  |  37 ++---
 .../hbase/regionserver/CompactionPipeline.java  |  12 +-
 .../EagerMemStoreCompactionStrategy.java        |  36 +++++
 .../hadoop/hbase/regionserver/HStore.java       |  24 ++-
 .../hbase/regionserver/ImmutableSegment.java    |  12 +-
 .../MemStoreCompactionStrategy.java             | 114 ++++++++++++++
 .../hbase/regionserver/MemStoreCompactor.java   | 153 +++++++------------
 .../hbase/regionserver/SegmentFactory.java      |  31 ++--
 .../regionserver/VersionedSegmentsList.java     |  27 ++++
 .../org/apache/hadoop/hbase/TestIOFencing.java  |   2 +-
 .../regionserver/TestCompactingMemStore.java    | 120 +++++++++++++--
 .../TestCompactingToCellFlatMapMemStore.java    |  23 +--
 .../hadoop/hbase/regionserver/TestHStore.java   |  19 ++-
 .../hbase/regionserver/TestRecoveredEdits.java  |   2 +-
 .../TestWalAndCompactingMemStoreFlush.java      |   2 +-
 24 files changed, 728 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java
index 654e7ab..099ea40 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java
@@ -42,5 +42,11 @@ public enum MemoryCompactionPolicy {
    * on-disk compaction does after the data is flushed to disk). This policy is most useful for
    * applications with high data churn or small working sets.
    */
-  EAGER
+  EAGER,
+  /**
+   * Adaptive compaction adapts to the workload. It applies either index compaction or data
+   * compaction based on the ratio of duplicate cells in the data.
+   */
+  ADAPTIVE
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 20c217f..8a5bb2c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -1372,6 +1372,35 @@ public final class CellUtil {
     return matchingColumn(left, right);
   }
 
+  public static boolean matchingRowColumnBytes(final Cell left, final Cell right) {
+    int lrowlength = left.getRowLength();
+    int rrowlength = right.getRowLength();
+    int lfamlength = left.getFamilyLength();
+    int rfamlength = right.getFamilyLength();
+    int lqlength = left.getQualifierLength();
+    int rqlength = right.getQualifierLength();
+    // match length
+    if ((lrowlength + lfamlength + lqlength) !=
+        (rrowlength + rfamlength + rqlength)) {
+      return false;
+    }
+
+    // match row
+    if (!Bytes.equals(left.getRowArray(), left.getRowOffset(), lrowlength, right.getRowArray(),
+        right.getRowOffset(), rrowlength)) {
+      return false;
+    }
+    //match family
+    if (!Bytes.equals(left.getFamilyArray(), left.getFamilyOffset(), lfamlength,
+        right.getFamilyArray(), right.getFamilyOffset(), rfamlength)) {
+      return false;
+    }
+    //match qualifier
+    return Bytes.equals(left.getQualifierArray(), left.getQualifierOffset(),
+        lqlength, right.getQualifierArray(), right.getQualifierOffset(),
+        rqlength);
+  }
+
   /**
    * Compares the cell's qualifier with the given byte[]
    * @param left the cell for which the qualifier has to be compared

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
index d9ea761..efcf8d0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
@@ -330,9 +330,10 @@ public class ClassSize {
     TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
 
     SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
+
     NON_SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG);
 
-    CELL_SET = align(OBJECT + REFERENCE);
+    CELL_SET = align(OBJECT + REFERENCE + Bytes.SIZEOF_INT);
 
     STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java
new file mode 100644
index 0000000..232ffe3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Adaptive is a heuristic that chooses whether to apply data compaction or not based on the
+ * level of redundancy in the data. Adaptive triggers redundancy elimination only for those
+ * stores where positive impact is expected.
+ *
+ * Adaptive uses two parameters to determine whether to perform redundancy elimination.
+ * The first parameter, u, estimates the ratio of unique keys in the memory store based on the
+ * fraction of unique keys encountered during the previous merge of segment indices.
+ * The second is the perceived probability (compactionProbability) that the store can benefit from
+ * redundancy elimination. Initially, compactionProbability=0.5; it then grows exponentially by
+ * 2% whenever a compaction is successful and decreased by 2% whenever a compaction did not meet
+ * the expectation. It is reset back to the default value (namely 0.5) upon disk flush.
+ *
+ * Adaptive triggers redundancy elimination with probability compactionProbability if the
+ * fraction of redundant keys 1-u exceeds a parameter threshold compactionThreshold.
+ */
+@InterfaceAudience.Private
+public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
+
+  private static final String name = "ADAPTIVE";
+  public static final String ADAPTIVE_COMPACTION_THRESHOLD_KEY =
+      "hbase.hregion.compacting.memstore.adaptive.compaction.threshold";
+  private static final double ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT = 0.5;
+  public static final String ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY =
+      "hbase.hregion.compacting.memstore.adaptive.compaction.probability";
+  private static final double ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT = 0.5;
+  private static final double ADAPTIVE_PROBABILITY_FACTOR = 1.02;
+
+  private double compactionThreshold;
+  private double initialCompactionProbability;
+  private double compactionProbability;
+  private Random rand = new Random();
+  private double numCellsInVersionedList = 0;
+  private boolean compacted = false;
+
+  public AdaptiveMemStoreCompactionStrategy(Configuration conf, String cfName) {
+    super(conf, cfName);
+    compactionThreshold = conf.getDouble(ADAPTIVE_COMPACTION_THRESHOLD_KEY,
+        ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT);
+    initialCompactionProbability = conf.getDouble(ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY,
+        ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT);
+    resetStats();
+  }
+
+  @Override public Action getAction(VersionedSegmentsList versionedList) {
+    if (versionedList.getEstimatedUniquesFrac() < 1.0 - compactionThreshold) {
+      double r = rand.nextDouble();
+      if(r < compactionProbability) {
+        numCellsInVersionedList = versionedList.getNumOfCells();
+        compacted = true;
+        return compact(versionedList, name+" (compaction probability="+compactionProbability+")");
+      }
+    }
+    compacted = false;
+    return simpleMergeOrFlatten(versionedList,
+        name+" (compaction probability="+compactionProbability+")");
+  }
+
+  @Override
+  public void updateStats(Segment replacement) {
+    if(compacted) {
+      if (replacement.getCellsCount() / numCellsInVersionedList < 1.0 - compactionThreshold) {
+        // compaction was a good decision - increase probability
+        compactionProbability *= ADAPTIVE_PROBABILITY_FACTOR;
+        if(compactionProbability > 1.0) {
+          compactionProbability = 1.0;
+        }
+      } else {
+        // compaction was NOT a good decision - decrease probability
+        compactionProbability /= ADAPTIVE_PROBABILITY_FACTOR;
+      }
+    }
+  }
+
+  @Override
+  public void resetStats() {
+    compactionProbability = initialCompactionProbability;
+  }
+  protected Action getMergingAction() {
+    return Action.MERGE_COUNT_UNIQUE_KEYS;
+  }
+
+  protected Action getFlattenAction() {
+    return Action.FLATTEN;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java
new file mode 100644
index 0000000..d816fc1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Basic strategy chooses between two actions: flattening a segment or merging indices of all
+ * segments in the pipeline.
+ * If number of segments in pipeline exceed the limit defined in MemStoreCompactionStrategy then
+ * apply merge, otherwise flatten some segment.
+ */
+@InterfaceAudience.Private
+public class BasicMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
+
+  private static final String name = "BASIC";
+
+  public BasicMemStoreCompactionStrategy(Configuration conf, String cfName) {
+    super(conf, cfName);
+  }
+
+  @Override
+  public Action getAction(VersionedSegmentsList versionedList) {
+    return simpleMergeOrFlatten(versionedList, name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
index 8cd5f2a..0e80b1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ClassSize;
 
@@ -42,7 +43,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
    * The given iterator returns the Cells that "survived" the compaction.
    */
   protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
-      MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) {
+      MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
     super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
     incSize(0, DEEP_OVERHEAD_CAM);
     // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
@@ -54,12 +55,14 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
    * of CSLMImmutableSegment
    * The given iterator returns the Cells that "survived" the compaction.
    */
-  protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing) {
+  protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing,
+      MemStoreCompactionStrategy.Action action) {
     super(segment); // initiailize the upper class
     incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM);
     int numOfCells = segment.getCellsCount();
     // build the new CellSet based on CellChunkMap and update the CellSet of this Segment
-    reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet());
+    reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
+        action);
     // arrange the meta-data size, decrease all meta-data sizes related to SkipList;
     // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
     long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
@@ -81,14 +84,18 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
   /*------------------------------------------------------------------------*/
   // Create CellSet based on CellArrayMap from compacting iterator
   private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator,
-      MemStoreCompactor.Action action) {
+      MemStoreCompactionStrategy.Action action) {
 
+    boolean merge = (action == MemStoreCompactionStrategy.Action.MERGE ||
+        action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
     Cell[] cells = new Cell[numOfCells];   // build the Cell Array
     int i = 0;
+    int numUniqueKeys=0;
+    Cell prev = null;
     while (iterator.hasNext()) {
       Cell c = iterator.next();
       // The scanner behind the iterator is doing all the elimination logic
-      if (action == MemStoreCompactor.Action.MERGE) {
+      if (merge) {
         // if this is merge we just move the Cell object without copying MSLAB
         // the sizes still need to be updated in the new segment
         cells[i] = c;
@@ -99,11 +106,27 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
       // second parameter true, because in compaction/merge the addition of the cell to new segment
       // is always successful
       updateMetaInfo(c, true, null); // updates the size per cell
+      if(action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
+        //counting number of unique keys
+        if (prev != null) {
+          if (!CellUtil.matchingRowColumnBytes(prev, c)) {
+            numUniqueKeys++;
+          }
+        } else {
+          numUniqueKeys++;
+        }
+      }
+      prev = c;
       i++;
     }
+    if(action == MemStoreCompactionStrategy.Action.COMPACT) {
+      numUniqueKeys = numOfCells;
+    } else if(action != MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
+      numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
+    }
     // build the immutable CellSet
     CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false);
-    this.setCellSet(null, new CellSet(cam));   // update the CellSet of this Segment
+    this.setCellSet(null, new CellSet(cam, numUniqueKeys));   // update the CellSet of this Segment
   }
 
   /*------------------------------------------------------------------------*/
@@ -111,22 +134,40 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
   // (without compacting iterator)
   // We do not consider cells bigger than chunks!
   private void reinitializeCellSet(
-      int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) {
+      int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet,
+      MemStoreCompactionStrategy.Action action) {
     Cell[] cells = new Cell[numOfCells];   // build the Cell Array
     Cell curCell;
     int idx = 0;
+    int numUniqueKeys=0;
+    Cell prev = null;
     try {
       while ((curCell = segmentScanner.next()) != null) {
         cells[idx++] = curCell;
+        if(action == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
+          //counting number of unique keys
+          if (prev != null) {
+            if (!CellUtil.matchingRowColumn(prev, curCell)) {
+              numUniqueKeys++;
+            }
+          } else {
+            numUniqueKeys++;
+          }
+        }
+        prev = curCell;
       }
     } catch (IOException ie) {
       throw new IllegalStateException(ie);
     } finally {
       segmentScanner.close();
     }
+    if(action != MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
+      numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
+    }
     // build the immutable CellSet
     CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false);
-    this.setCellSet(oldCellSet, new CellSet(cam));   // update the CellSet of this Segment
+    // update the CellSet of this Segment
+    this.setCellSet(oldCellSet, new CellSet(cam, numUniqueKeys));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
index 4ef0657..7db00a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
 import org.apache.hadoop.hbase.ByteBufferKeyValue;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -48,7 +49,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
    * The given iterator returns the Cells that "survived" the compaction.
    */
   protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
-      MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) {
+      MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
     super(null, comparator, memStoreLAB); // initialize the CellSet with NULL
     incSize(0, DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata
     // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
@@ -61,12 +62,13 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
    * The given iterator returns the Cells that "survived" the compaction.
    */
   protected CellChunkImmutableSegment(CSLMImmutableSegment segment,
-      MemStoreSizing memstoreSizing) {
+      MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) {
     super(segment); // initiailize the upper class
     incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
     int numOfCells = segment.getCellsCount();
     // build the new CellSet based on CellChunkMap
-    reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet());
+    reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
+        action);
     // arrange the meta-data size, decrease all meta-data sizes related to SkipList;
     // add sizes of CellChunkMap entry, decrease also Cell object sizes
     // (reinitializeCellSet doesn't take the care for the sizes)
@@ -90,15 +92,17 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
   /*------------------------------------------------------------------------*/
   // Create CellSet based on CellChunkMap from compacting iterator
   private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator,
-      MemStoreCompactor.Action action) {
+      MemStoreCompactionStrategy.Action action) {
 
     // calculate how many chunks we will need for index
     int chunkSize = ChunkCreator.getInstance().getChunkSize();
     int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK;
-    int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk);
+    int numberOfChunks = calculateNumberOfChunks(numOfCells, numOfCellsInChunk);
     int numOfCellsAfterCompaction = 0;
     int currentChunkIdx = 0;
     int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
+    int numUniqueKeys=0;
+    Cell prev = null;
     // all index Chunks are allocated from ChunkCreator
     Chunk[] chunks = new Chunk[numberOfChunks];
     for (int i=0; i < numberOfChunks; i++) {
@@ -112,7 +116,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
         currentChunkIdx++;              // continue to the next index chunk
         offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
       }
-      if (action == MemStoreCompactor.Action.COMPACT) {
+      if (action == MemStoreCompactionStrategy.Action.COMPACT) {
         c = maybeCloneWithAllocator(c); // for compaction copy cell to the new segment (MSLAB copy)
       }
       offsetInCurentChunk = // add the Cell reference to the index chunk
@@ -122,11 +126,27 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
       // second parameter true, because in compaction/merge the addition of the cell to new segment
       // is always successful
       updateMetaInfo(c, true, null); // updates the size per cell
+      if(action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
+        //counting number of unique keys
+        if (prev != null) {
+          if (!CellUtil.matchingRowColumnBytes(prev, c)) {
+            numUniqueKeys++;
+          }
+        } else {
+          numUniqueKeys++;
+        }
+      }
+      prev = c;
+    }
+    if(action == MemStoreCompactionStrategy.Action.COMPACT) {
+      numUniqueKeys = numOfCells;
+    } else if(action != MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
+      numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
     }
     // build the immutable CellSet
     CellChunkMap ccm =
         new CellChunkMap(getComparator(), chunks, 0, numOfCellsAfterCompaction, false);
-    this.setCellSet(null, new CellSet(ccm));  // update the CellSet of this Segment
+    this.setCellSet(null, new CellSet(ccm, numUniqueKeys));  // update the CellSet of this Segment
   }
 
   /*------------------------------------------------------------------------*/
@@ -135,12 +155,13 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
   // This is a service for not-flat immutable segments
   // Assumption: cells do not exceed chunk size!
   private void reinitializeCellSet(
-      int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) {
+      int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet,
+      MemStoreCompactionStrategy.Action action) {
     Cell curCell;
     // calculate how many chunks we will need for metadata
     int chunkSize = ChunkCreator.getInstance().getChunkSize();
     int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK;
-    int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk);
+    int numberOfChunks = calculateNumberOfChunks(numOfCells, numOfCellsInChunk);
     // all index Chunks are allocated from ChunkCreator
     Chunk[] chunks = new Chunk[numberOfChunks];
     for (int i=0; i < numberOfChunks; i++) {
@@ -150,6 +171,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
     int currentChunkIdx = 0;
     int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
 
+    int numUniqueKeys=0;
+    Cell prev = null;
     try {
       while ((curCell = segmentScanner.next()) != null) {
         assert (curCell instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue
@@ -161,6 +184,20 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
         offsetInCurentChunk =
             createCellReference((ByteBufferKeyValue) curCell, chunks[currentChunkIdx].getData(),
                 offsetInCurentChunk);
+        if(action == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
+          //counting number of unique keys
+          if (prev != null) {
+            if (!CellUtil.matchingRowColumn(prev, curCell)) {
+              numUniqueKeys++;
+            }
+          } else {
+            numUniqueKeys++;
+          }
+        }
+        prev = curCell;
+      }
+      if(action != MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
+        numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
       }
     } catch (IOException ie) {
       throw new IllegalStateException(ie);
@@ -169,7 +206,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
     }
 
     CellChunkMap ccm = new CellChunkMap(getComparator(), chunks, 0, numOfCells, false);
-    this.setCellSet(oldCellSet, new CellSet(ccm)); // update the CellSet of this Segment
+    // update the CellSet of this Segment
+    this.setCellSet(oldCellSet, new CellSet(ccm, numUniqueKeys));
   }
 
   /*------------------------------------------------------------------------*/

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
index 3b11baa..3fbd46d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
index e16d961..56717ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
@@ -41,6 +41,8 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public class CellSet implements NavigableSet<Cell>  {
+
+  public static final int UNKNOWN_NUM_UNIQUES = -1;
   // Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap}
   // Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it
   // is not already present.", this implementation "Adds the specified element to this set EVEN
@@ -48,12 +50,22 @@ public class CellSet implements NavigableSet<Cell>  {
   // Otherwise, has same attributes as ConcurrentSkipListSet
   private final NavigableMap<Cell, Cell> delegatee; ///
 
+  private final int numUniqueKeys;
+
   CellSet(final CellComparator c) {
     this.delegatee = new ConcurrentSkipListMap<>(c);
+    this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
+  }
+
+  CellSet(final NavigableMap<Cell, Cell> m, int numUniqueKeys) {
+    this.delegatee = m;
+    this.numUniqueKeys = numUniqueKeys;
   }
 
+  @VisibleForTesting
   CellSet(final NavigableMap<Cell, Cell> m) {
     this.delegatee = m;
+    this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
   }
 
   @VisibleForTesting
@@ -83,7 +95,7 @@ public class CellSet implements NavigableSet<Cell>  {
 
   public NavigableSet<Cell> headSet(final Cell toElement,
       boolean inclusive) {
-    return new CellSet(this.delegatee.headMap(toElement, inclusive));
+    return new CellSet(this.delegatee.headMap(toElement, inclusive), UNKNOWN_NUM_UNIQUES);
   }
 
   public Cell higher(Cell e) {
@@ -120,7 +132,7 @@ public class CellSet implements NavigableSet<Cell>  {
   }
 
   public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
-    return new CellSet(this.delegatee.tailMap(fromElement, inclusive));
+    return new CellSet(this.delegatee.tailMap(fromElement, inclusive), UNKNOWN_NUM_UNIQUES);
   }
 
   public Comparator<? super Cell> comparator() {
@@ -187,4 +199,8 @@ public class CellSet implements NavigableSet<Cell>  {
   public <T> T[] toArray(T[] a) {
     throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
   }
+
+  public int getNumUniqueKeys() {
+    return numUniqueKeys;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index e6f9451..d250252 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -66,13 +67,13 @@ public class CompactingMemStore extends AbstractMemStore {
   // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
   public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
       "hbase.memstore.inmemoryflush.threshold.factor";
-  private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.25;
+  private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.02;
 
   private static final Log LOG = LogFactory.getLog(CompactingMemStore.class);
   private HStore store;
   private RegionServicesForStores regionServices;
   private CompactionPipeline pipeline;
-  private MemStoreCompactor compactor;
+  protected MemStoreCompactor compactor;
 
   private long inmemoryFlushSize;       // the threshold on active size for in-memory flush
   private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
@@ -81,7 +82,7 @@ public class CompactingMemStore extends AbstractMemStore {
   private boolean inWalReplay = false;
 
   @VisibleForTesting
-  private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
+  protected final AtomicBoolean allowCompaction = new AtomicBoolean(true);
   private boolean compositeSnapshot = true;
 
   /**
@@ -119,7 +120,8 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   @VisibleForTesting
-  protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
+  protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
+      throws IllegalArgumentIOException {
     return new MemStoreCompactor(this, compactionPolicy);
   }
 
@@ -205,6 +207,7 @@ public class CompactingMemStore extends AbstractMemStore {
       } else {
         pushTailToSnapshot();
       }
+      compactor.resetStats();
     }
     return new MemStoreSnapshot(snapshotId, this.snapshot);
   }
@@ -298,10 +301,6 @@ public class CompactingMemStore extends AbstractMemStore {
     this.compositeSnapshot = useCompositeSnapshot;
   }
 
-  public boolean isCompositeSnapshot() {
-    return this.compositeSnapshot;
-  }
-
   public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
       boolean merge) {
     // last true stands for updating the region size
@@ -313,8 +312,8 @@ public class CompactingMemStore extends AbstractMemStore {
    *           with version taken earlier. This version must be passed as a parameter here.
    *           The flattening happens only if versions match.
    */
-  public void flattenOneSegment(long requesterVersion) {
-    pipeline.flattenOneSegment(requesterVersion, indexType);
+  public void flattenOneSegment(long requesterVersion,  MemStoreCompactionStrategy.Action action) {
+    pipeline.flattenOneSegment(requesterVersion, indexType, action);
   }
 
   // setter is used only for testability
@@ -543,29 +542,11 @@ public class CompactingMemStore extends AbstractMemStore {
     }
   }
 
-  //----------------------------------------------------------------------
-  //methods for tests
-  //----------------------------------------------------------------------
   @VisibleForTesting
   boolean isMemStoreFlushingInMemory() {
     return inMemoryFlushInProgress.get();
   }
 
-  @VisibleForTesting
-  void disableCompaction() {
-    allowCompaction.set(false);
-  }
-
-  @VisibleForTesting
-  void enableCompaction() {
-    allowCompaction.set(true);
-  }
-
-  @VisibleForTesting
-  void initiateType(MemoryCompactionPolicy compactionType) {
-    compactor.initiateAction(compactionType);
-  }
-
   /**
    * @param cell Find the row that comes after this one.  If null, we return the
    *             first.

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index e44cb45..2f479e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -126,14 +126,10 @@ public class CompactionPipeline {
       }
       suffix = versionedList.getStoreSegments();
       if (LOG.isDebugEnabled()) {
-        int count = 0;
-        if(segment != null) {
-          count = segment.getCellsCount();
-        }
         LOG.debug("Swapping pipeline suffix. "
             + "Just before the swap the number of segments in pipeline is:"
             + versionedList.getStoreSegments().size()
-            + ", and the number of cells in new segment is:" + count);
+            + ", and the new segment is:" + segment);
       }
       swapSuffix(suffix, segment, closeSuffix);
       readOnlyCopy = new LinkedList<>(pipeline);
@@ -183,7 +179,9 @@ public class CompactionPipeline {
    *
    * @return true iff a segment was successfully flattened
    */
-  public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType) {
+  public boolean flattenOneSegment(long requesterVersion,
+      CompactingMemStore.IndexType idxType,
+      MemStoreCompactionStrategy.Action action) {
 
     if(requesterVersion != version) {
       LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
@@ -201,7 +199,7 @@ public class CompactionPipeline {
         if ( s.canBeFlattened() ) {
           MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated
           ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
-              (CSLMImmutableSegment)s,idxType,newMemstoreAccounting);
+              (CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action);
           replaceAtIndex(i,newS);
           if(region != null) {
             // update the global memstore size counter

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java
new file mode 100644
index 0000000..90d0756
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class EagerMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
+
+  private static final String name = "EAGER";
+  public EagerMemStoreCompactionStrategy(Configuration conf, String cfName) {
+    super(conf, cfName);
+  }
+
+  @Override
+  public Action getAction(VersionedSegmentsList versionedList) {
+    return compact(versionedList, name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 4b83b23..db900a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -278,19 +278,17 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
     }
     String className;
     switch (inMemoryCompaction) {
-    case BASIC:
-    case EAGER:
-      Class<? extends CompactingMemStore> clz =
-          conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class);
-      className = clz.getName();
-      this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
-          this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
-      break;
-    case NONE:
-    default:
-      className = DefaultMemStore.class.getName();
-      this.memstore = ReflectionUtils.newInstance(DefaultMemStore.class,
-        new Object[] { conf, this.comparator });
+      case NONE:
+        className = DefaultMemStore.class.getName();
+        this.memstore = ReflectionUtils.newInstance(DefaultMemStore.class,
+            new Object[] { conf, this.comparator });
+        break;
+      default:
+        Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,
+          CompactingMemStore.class, CompactingMemStore.class);
+        className = clz.getName();
+        this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
+            this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
     }
     LOG.info("Memstore class name is " + className);
     this.offPeakHours = OffPeakHours.getInstance(conf);

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index c1244ff..02a05c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -40,6 +40,10 @@ public abstract class ImmutableSegment extends Segment {
   // each sub-type of immutable segment knows whether it is flat or not
   protected abstract boolean canBeFlattened();
 
+  public int getNumUniqueKeys() {
+    return getCellSet().getNumUniqueKeys();
+  }
+
   /////////////////////  CONSTRUCTORS  /////////////////////
   /**------------------------------------------------------------------------
    * Empty C-tor to be used only for CompositeImmutableSegment
@@ -64,7 +68,6 @@ public abstract class ImmutableSegment extends Segment {
     super(segment);
   }
 
-
   /////////////////////  PUBLIC METHODS  /////////////////////
 
   public int getNumOfSegments() {
@@ -75,4 +78,11 @@ public abstract class ImmutableSegment extends Segment {
     List<Segment> res = new ArrayList<>(Arrays.asList(this));
     return res;
   }
+
+  @Override
+  public String toString() {
+    String res = super.toString();
+    res += "Num uniques "+getNumUniqueKeys()+"; ";
+    return res;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java
new file mode 100644
index 0000000..b262328
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * MemStoreCompactionStrategy is the root of a class hierarchy which defines the strategy for
+ * choosing the next action to apply in an (in-memory) memstore compaction.
+ * Possible action are:
+ *  - No-op - do nothing
+ *  - Flatten - to change the segment's index from CSLM to a flat representation
+ *  - Merge - to merge the indices of the segments in the pipeline
+ *  - Compact - to merge the indices while removing data redundancies
+ *
+ * In addition while applying flat/merge actions it is possible to count the number of unique
+ * keys in the result segment.
+ */
+@InterfaceAudience.Private
+public abstract class MemStoreCompactionStrategy {
+
+  protected static final Log LOG = LogFactory.getLog(MemStoreCompactionStrategy.class);
+  // The upper bound for the number of segments we store in the pipeline prior to merging.
+  public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
+      "hbase.hregion.compacting.pipeline.segments.limit";
+  public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 4;
+
+  /**
+   * Types of actions to be done on the pipeline upon MemStoreCompaction invocation.
+   * Note that every value covers the previous ones, i.e. if MERGE is the action it implies
+   * that the youngest segment is going to be flatten anyway.
+   */
+  public enum Action {
+    NOOP,
+    FLATTEN,  // flatten a segment in the pipeline
+    FLATTEN_COUNT_UNIQUE_KEYS,  // flatten a segment in the pipeline and count its unique keys
+    MERGE,    // merge all the segments in the pipeline into one
+    MERGE_COUNT_UNIQUE_KEYS,    // merge all pipeline segments into one and count its unique keys
+    COMPACT   // compact the data of all pipeline segments
+  }
+
+  protected final String cfName;
+  // The limit on the number of the segments in the pipeline
+  protected final int pipelineThreshold;
+
+
+  public MemStoreCompactionStrategy(Configuration conf, String cfName) {
+    this.cfName = cfName;
+    if(conf == null) {
+      pipelineThreshold = COMPACTING_MEMSTORE_THRESHOLD_DEFAULT;
+    } else {
+      pipelineThreshold =         // get the limit on the number of the segments in the pipeline
+          conf.getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY, COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
+    }
+  }
+
+  // get next compaction action to apply on compaction pipeline
+  public abstract Action getAction(VersionedSegmentsList versionedList);
+  // update policy stats based on the segment that replaced previous versioned list (in
+  // compaction pipeline)
+  public void updateStats(Segment replacement) {}
+  // resets policy stats
+  public void resetStats() {}
+
+  protected Action simpleMergeOrFlatten(VersionedSegmentsList versionedList, String strategy) {
+    int numOfSegments = versionedList.getNumOfSegments();
+    if (numOfSegments > pipelineThreshold) {
+      // to avoid too many segments, merge now
+      LOG.debug(strategy+" memory compaction for store " + cfName
+          + " merging " + numOfSegments + " segments");
+      return getMergingAction();
+    }
+
+    // just flatten a segment
+    LOG.debug(strategy+" memory compaction for store " + cfName
+        + " flattening a segment in the pipeline");
+    return getFlattenAction();
+  }
+
+  protected Action getMergingAction() {
+    return Action.MERGE;
+  }
+
+  protected Action getFlattenAction() {
+    return Action.FLATTEN;
+  }
+
+  protected Action compact(VersionedSegmentsList versionedList, String strategyInfo) {
+    int numOfSegments = versionedList.getNumOfSegments();
+    LOG.debug(strategyInfo+" memory compaction for store " + cfName
+        + " compacting " + numOfSegments + " segments");
+    return Action.COMPACT;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 8af33b6..fea9f17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -44,26 +46,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @InterfaceAudience.Private
 public class MemStoreCompactor {
 
-  // The upper bound for the number of segments we store in the pipeline prior to merging.
-  // This constant is subject to further experimentation.
-  // The external setting of the compacting MemStore behaviour
-  public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
-      "hbase.hregion.compacting.pipeline.segments.limit";
-  // remaining with the same ("infinity") but configurable default for now
-  public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 1;
-
   public static final long DEEP_OVERHEAD = ClassSize
-      .align(ClassSize.OBJECT
-          + 4 * ClassSize.REFERENCE
-          // compactingMemStore, versionedList, action, isInterrupted (the reference)
+      .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE
+          // compactingMemStore, versionedList, isInterrupted, strategy (the reference)
           // "action" is an enum and thus it is a class with static final constants,
           // so counting only the size of the reference to it and not the size of the internals
-          + 2 * Bytes.SIZEOF_INT        // compactionKVMax, pipelineThreshold
+          + Bytes.SIZEOF_INT        // compactionKVMax
           + ClassSize.ATOMIC_BOOLEAN    // isInterrupted (the internals)
       );
 
   private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
-  private final int pipelineThreshold; // the limit on the number of the segments in the pipeline
   private CompactingMemStore compactingMemStore;
 
   // a static version of the segment list from the pipeline
@@ -75,29 +67,15 @@ public class MemStoreCompactor {
   // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator
   private final int compactionKVMax;
 
-  /**
-   * Types of actions to be done on the pipeline upon MemStoreCompaction invocation.
-   * Note that every value covers the previous ones, i.e. if MERGE is the action it implies
-   * that the youngest segment is going to be flatten anyway.
-   */
-  public enum Action {
-    NOOP,
-    FLATTEN,  // flatten the youngest segment in the pipeline
-    MERGE,    // merge all the segments in the pipeline into one
-    COMPACT   // copy-compact the data of all the segments in the pipeline
-  }
-
-  private Action action = Action.FLATTEN;
+  private MemStoreCompactionStrategy strategy;
 
   public MemStoreCompactor(CompactingMemStore compactingMemStore,
-      MemoryCompactionPolicy compactionPolicy) {
+      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
     this.compactingMemStore = compactingMemStore;
     this.compactionKVMax = compactingMemStore.getConfiguration()
         .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
-    initiateAction(compactionPolicy);
-    pipelineThreshold =         // get the limit on the number of the segments in the pipeline
-        compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY,
-            COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
+    initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(),
+        compactingMemStore.getFamilyName());
   }
 
   /**----------------------------------------------------------------------
@@ -132,11 +110,9 @@ public class MemStoreCompactor {
       isInterrupted.compareAndSet(false, true);
   }
 
-  /**----------------------------------------------------------------------
-   * The interface to check whether user requested the index-compaction
-   */
-  public boolean isIndexCompaction() {
-    return (action == Action.MERGE);
+
+  public void resetStats() {
+    strategy.resetStats();
   }
 
   /**----------------------------------------------------------------------
@@ -149,40 +125,6 @@ public class MemStoreCompactor {
   }
 
   /**----------------------------------------------------------------------
-   * Decide what to do with the new and old segments in the compaction pipeline.
-   * Implements basic in-memory compaction policy.
-   */
-  private Action policy() {
-
-    if (isInterrupted.get()) {      // if the entire process is interrupted cancel flattening
-      return Action.NOOP;           // the compaction also doesn't start when interrupted
-    }
-
-    if (action == Action.COMPACT) { // compact according to the user request
-      LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
-          + " is going to be compacted to the " + compactingMemStore.getIndexType() + ". Number of"
-          + " cells before compaction is " + versionedList.getNumOfCells());
-      return Action.COMPACT;
-    }
-
-    // compaction shouldn't happen or doesn't worth it
-    // limit the number of the segments in the pipeline
-    int numOfSegments = versionedList.getNumOfSegments();
-    if (numOfSegments > pipelineThreshold) {
-      LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
-          + " is going to be merged to the " + compactingMemStore.getIndexType()
-          + ", as there are " + numOfSegments + " segments");
-      return Action.MERGE;          // to avoid too many segments, merge now
-    }
-
-    // if nothing of the above, then just flatten the newly joined segment
-    LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store "
-        + compactingMemStore.getFamilyName() + " is going to be flattened to the "
-        + compactingMemStore.getIndexType());
-    return Action.FLATTEN;
-  }
-
-  /**----------------------------------------------------------------------
   * The worker thread performs the compaction asynchronously.
   * The solo (per compactor) thread only reads the compaction pipeline.
   * There is at most one thread per memstore instance.
@@ -190,29 +132,37 @@ public class MemStoreCompactor {
   private void doCompaction() {
     ImmutableSegment result = null;
     boolean resultSwapped = false;
-    Action nextStep = null;
-    try {
-      nextStep = policy();
+    if (isInterrupted.get()) {      // if the entire process is interrupted cancel flattening
+      return;           // the compaction also doesn't start when interrupted
+    }
 
-      if (nextStep == Action.NOOP) {
+    MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList);
+    boolean merge =
+        (nextStep == MemStoreCompactionStrategy.Action.MERGE ||
+            nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
+    try {
+      if (nextStep == MemStoreCompactionStrategy.Action.NOOP) {
         return;
       }
-      if (nextStep == Action.FLATTEN) {
-        // Youngest Segment in the pipeline is with SkipList index, make it flat
-        compactingMemStore.flattenOneSegment(versionedList.getVersion());
+      if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN ||
+          nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
+        // some Segment in the pipeline is with SkipList index, make it flat
+        compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep);
         return;
       }
 
       // Create one segment representing all segments in the compaction pipeline,
       // either by compaction or by merge
       if (!isInterrupted.get()) {
-        result = createSubstitution();
+        result = createSubstitution(nextStep);
       }
 
       // Substitute the pipeline with one segment
       if (!isInterrupted.get()) {
         if (resultSwapped = compactingMemStore.swapCompactedSegments(
-            versionedList, result, (action==Action.MERGE))) {
+            versionedList, result, merge)) {
+          // update compaction strategy
+          strategy.updateStats(result);
           // update the wal so it can be truncated and not get too long
           compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
         }
@@ -226,10 +176,8 @@ public class MemStoreCompactor {
       // we DON'T need to close the result segment (meaning its MSLAB)!
       // Because closing the result segment means closing the chunks of all segments
       // in the compaction pipeline, which still have ongoing scans.
-      if (nextStep != Action.MERGE) {
-        if ((result != null) && (!resultSwapped)) {
-          result.close();
-        }
+      if (!merge && (result != null) && !resultSwapped) {
+        result.close();
       }
       releaseResources();
     }
@@ -240,7 +188,8 @@ public class MemStoreCompactor {
    * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the
    * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned.
    */
-  private ImmutableSegment createSubstitution() throws IOException {
+  private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws
+      IOException {
 
     ImmutableSegment result = null;
     MemStoreSegmentsIterator iterator = null;
@@ -248,21 +197,24 @@ public class MemStoreCompactor {
     switch (action) {
       case COMPACT:
         iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
-            compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore());
+            compactingMemStore.getComparator(),
+            compactionKVMax, compactingMemStore.getStore());
 
         result = SegmentFactory.instance().createImmutableSegmentByCompaction(
           compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
-          versionedList.getNumOfCells(), compactingMemStore.getIndexType());
+          versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action);
         iterator.close();
         break;
       case MERGE:
-        iterator = new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
+      case MERGE_COUNT_UNIQUE_KEYS:
+        iterator =
+            new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
             compactingMemStore.getComparator(), compactionKVMax);
 
         result = SegmentFactory.instance().createImmutableSegmentByMerge(
           compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
           versionedList.getNumOfCells(), versionedList.getStoreSegments(),
-          compactingMemStore.getIndexType());
+          compactingMemStore.getIndexType(), action);
         iterator.close();
         break;
       default:
@@ -272,21 +224,22 @@ public class MemStoreCompactor {
     return result;
   }
 
-  /**----------------------------------------------------------------------
-   * Initiate the action according to user config, after its default is Action.MERGE
-   */
   @VisibleForTesting
-  void initiateAction(MemoryCompactionPolicy compType) {
+  void initiateCompactionStrategy(MemoryCompactionPolicy compType,
+      Configuration configuration, String cfName) throws IllegalArgumentIOException {
+
+    assert (compType !=MemoryCompactionPolicy.NONE);
 
     switch (compType){
-    case NONE: action = Action.NOOP;
-      break;
-    case BASIC: action = Action.MERGE;
-      break;
-    case EAGER: action = Action.COMPACT;
-      break;
-    default:
-      throw new RuntimeException("Unknown memstore type " + compType); // sanity check
+      case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName);
+        break;
+      case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName);
+        break;
+      case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName);
+        break;
+      default:
+        // sanity check
+        throw new IllegalArgumentIOException("Unknown memory compaction type " + compType);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index 43836f4..db0b319 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -51,13 +51,13 @@ public final class SegmentFactory {
   // for compaction
   public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
       final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
-      CompactingMemStore.IndexType idxType)
+      CompactingMemStore.IndexType idxType, MemStoreCompactionStrategy.Action action)
       throws IOException {
 
     MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
     return
         createImmutableSegment(
-            conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.COMPACT,idxType);
+            conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType);
   }
 
   // create empty immutable segment
@@ -82,13 +82,14 @@ public final class SegmentFactory {
   // for merge
   public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf,
       final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
-      List<ImmutableSegment> segments, CompactingMemStore.IndexType idxType)
+      List<ImmutableSegment> segments, CompactingMemStore.IndexType idxType,
+      MemStoreCompactionStrategy.Action action)
       throws IOException {
 
     MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments);
     return
         createImmutableSegment(
-            conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.MERGE,idxType);
+            conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType);
 
   }
 
@@ -96,18 +97,18 @@ public final class SegmentFactory {
   // for flattening
   public ImmutableSegment createImmutableSegmentByFlattening(
       CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType,
-      MemStoreSizing memstoreSizing) {
+      MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) {
     ImmutableSegment res = null;
     switch (idxType) {
-    case CHUNK_MAP:
-      res = new CellChunkImmutableSegment(segment, memstoreSizing);
-      break;
-    case CSLM_MAP:
-      assert false; // non-flat segment can not be the result of flattening
-      break;
-    case ARRAY_MAP:
-      res = new CellArrayImmutableSegment(segment, memstoreSizing);
-      break;
+      case CHUNK_MAP:
+        res = new CellChunkImmutableSegment(segment, memstoreSizing, action);
+        break;
+      case CSLM_MAP:
+        assert false; // non-flat segment can not be the result of flattening
+        break;
+      case ARRAY_MAP:
+        res = new CellArrayImmutableSegment(segment, memstoreSizing, action);
+        break;
     }
     return res;
   }
@@ -116,7 +117,7 @@ public final class SegmentFactory {
   //****** private methods to instantiate concrete store segments **********//
   private ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator,
       MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells,
-      MemStoreCompactor.Action action, CompactingMemStore.IndexType idxType) {
+      MemStoreCompactionStrategy.Action action, CompactingMemStore.IndexType idxType) {
 
     ImmutableSegment res = null;
     switch (idxType) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
index 4269863..a697912 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.util.List;
 
 import org.apache.yetus.audience.InterfaceAudience;
@@ -62,4 +64,29 @@ public class VersionedSegmentsList {
   public int getNumOfSegments() {
     return storeSegments.size();
   }
+
+  // Estimates fraction of unique keys
+  @VisibleForTesting
+  double getEstimatedUniquesFrac() {
+    int segmentCells = 0;
+    int maxCells = 0;
+    double est = 0;
+
+    for (ImmutableSegment s : storeSegments) {
+      double segmentUniques = s.getNumUniqueKeys();
+      if(segmentUniques != CellSet.UNKNOWN_NUM_UNIQUES) {
+        segmentCells = s.getCellsCount();
+        if(segmentCells > maxCells) {
+          maxCells = segmentCells;
+          est = segmentUniques / segmentCells;
+        }
+      }
+      // else ignore this segment specifically since if the unique number is unknown counting
+      // cells can be expensive
+    }
+    if(maxCells == 0) {
+      return 1.0;
+    }
+    return est;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index b759261..2ef200f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -368,7 +368,7 @@ public class TestIOFencing {
           Thread.sleep(1000);
         }
       }
-      if (policy == MemoryCompactionPolicy.EAGER) {
+      if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) {
         assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count);
       } else {
         assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count);

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index b44fd34..0d5290d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -26,8 +26,23 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.MemoryCompactionPolicy;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -73,7 +88,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
   @Before
   public void setUp() throws Exception {
     compactingSetUp();
-    this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparatorImpl.COMPARATOR,
+    this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), CellComparatorImpl
+        .COMPARATOR,
         store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
   }
 
@@ -482,7 +498,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
     memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(compactionType));
-    ((CompactingMemStore)memstore).initiateType(compactionType);
+    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
 
     byte[] row = Bytes.toBytes("testrow");
     byte[] fam = Bytes.toBytes("testfamily");
@@ -497,7 +513,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
 
     // Creating a pipeline
-    ((CompactingMemStore)memstore).disableCompaction();
+    ((MyCompactingMemStore)memstore).disableCompaction();
     ((CompactingMemStore)memstore).flushInMemory();
 
     // Adding value to "new" memstore
@@ -512,7 +528,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     List<KeyValueScanner> scanners = memstore.getScanners(0);
     // Shouldn't putting back the chunks to pool,since some scanners are opening
     // based on their data
-    ((CompactingMemStore)memstore).enableCompaction();
+    ((MyCompactingMemStore)memstore).enableCompaction();
     // trigger compaction
     ((CompactingMemStore)memstore).flushInMemory();
 
@@ -574,7 +590,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
     memstore.getConfiguration()
         .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType));
-    ((CompactingMemStore)memstore).initiateType(compactionType);
+    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
 
     String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
 
@@ -613,7 +629,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
     memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(compactionType));
-    ((CompactingMemStore)memstore).initiateType(compactionType);
+    memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
+        String.valueOf(1));
+    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
     String[] keys1 = { "A", "A", "B", "C" };
     String[] keys2 = { "A", "B", "D" };
 
@@ -669,7 +687,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
     memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(compactionType));
-    ((CompactingMemStore)memstore).initiateType(compactionType);
+    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
     String[] keys1 = { "A", "A", "B", "C" };
     String[] keys2 = { "A", "B", "D" };
     String[] keys3 = { "D", "B", "B" };
@@ -698,7 +716,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
     assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
 
-    ((CompactingMemStore) memstore).disableCompaction();
+    ((MyCompactingMemStore) memstore).disableCompaction();
     MemStoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
     assertEquals(0, memstore.getSnapshot().getCellsCount());
@@ -714,7 +732,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
         + 3 * oneCellOnCSLMHeapSize;
     assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
 
-    ((CompactingMemStore)memstore).enableCompaction();
+    ((MyCompactingMemStore)memstore).enableCompaction();
     size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     assertEquals(0, memstore.getSnapshot().getCellsCount());
@@ -738,6 +756,67 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     memstore.clearSnapshot(snapshot.getId());
   }
 
+  @Test
+  public void testMagicCompaction3Buckets() throws IOException {
+
+    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
+    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
+        String.valueOf(compactionType));
+    memstore.getConfiguration().setDouble(
+        AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
+    memstore.getConfiguration().setInt(
+        AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
+    memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
+    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
+
+    String[] keys1 = { "A", "B", "D" };
+    String[] keys2 = { "A" };
+    String[] keys3 = { "A", "A", "B", "C" };
+    String[] keys4 = { "D", "B", "B" };
+
+    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
+    int oneCellOnCSLMHeapSize = 120;
+    assertEquals(totalCellsLen1, region.getMemStoreSize());
+    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
+    assertEquals(totalHeapSize, memstore.heapSize());
+
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline - flatten
+    assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
+    assertEquals(1.0,
+        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+    addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
+    assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
+    assertEquals(1.0,
+        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+    addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
+    assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
+    assertEquals((4.0 / 8.0),
+        ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+    addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
+    int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
+    assertTrue(4 == numCells || 11 == numCells);
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+    MemStoreSize size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.decrMemStoreSize(size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    numCells = s.getCellsCount();
+    assertTrue(4 == numCells || 11 == numCells);
+    assertEquals(0, regionServicesForStores.getMemStoreSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
   protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
     byte[] fam = Bytes.toBytes("testfamily");
     byte[] qf = Bytes.toBytes("testqualifier");
@@ -771,4 +850,25 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       }
     }
 
+  static protected class MyCompactingMemStore extends CompactingMemStore {
+
+    public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
+        RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
+        throws IOException {
+      super(conf, c, store, regionServices, compactionPolicy);
+    }
+
+    void disableCompaction() {
+      allowCompaction.set(false);
+    }
+
+    void enableCompaction() {
+      allowCompaction.set(true);
+    }
+    void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
+        throws IllegalArgumentIOException {
+      compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
index 1c58723..b4cf4ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
@@ -21,7 +21,11 @@ package org.apache.hadoop.hbase.regionserver;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -76,7 +80,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
         String.valueOf(MemoryCompactionPolicy.EAGER));
 
     this.memstore =
-        new CompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
+        new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
             regionServicesForStores, MemoryCompactionPolicy.EAGER);
   }
 
@@ -229,7 +233,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
     assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
     assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
 
-    ((CompactingMemStore) memstore).disableCompaction();
+    ((MyCompactingMemStore) memstore).disableCompaction();
     size = memstore.getFlushableSize();
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
     totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
@@ -244,7 +248,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
     assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
         ((CompactingMemStore) memstore).heapSize());
 
-    ((CompactingMemStore) memstore).enableCompaction();
+    ((MyCompactingMemStore) memstore).enableCompaction();
     size = memstore.getFlushableSize();
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
     while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
@@ -293,7 +297,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
     MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
     memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(compactionType));
-    ((CompactingMemStore)memstore).initiateType(compactionType);
+    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
     addRowsByKeysDataSize(memstore, keys1);
 
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
@@ -311,7 +315,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
     }
     assertEquals(12, counter2);
 
-    ((CompactingMemStore) memstore).disableCompaction();
+    ((MyCompactingMemStore) memstore).disableCompaction();
 
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
     assertEquals(0, memstore.getSnapshot().getCellsCount());
@@ -330,7 +334,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
     }
     assertEquals(16, counter4);
 
-    ((CompactingMemStore) memstore).enableCompaction();
+    ((MyCompactingMemStore) memstore).enableCompaction();
 
 
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
@@ -372,7 +376,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
     MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
     memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(compactionType));
-    ((CompactingMemStore)memstore).initiateType(compactionType);
+    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
     testTimeRange(false);
   }
 
@@ -603,7 +607,6 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
     assertTrue(chunkCount > 0);
   }
 
-
   @Test
   public void testFlatteningToCellChunkMap() throws IOException {
 
@@ -611,7 +614,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
     MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
     memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(compactionType));
-    ((CompactingMemStore)memstore).initiateType(compactionType);
+    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
     memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
         String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
     ((CompactingMemStore)memstore).setIndexType();