You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/11/05 12:38:42 UTC

hbase git commit: HBASE-19095 Add CP hooks in RegionObserver for in memory compaction

Repository: hbase
Updated Branches:
  refs/heads/master 3a0f59d03 -> 28cdf4afb


HBASE-19095 Add CP hooks in RegionObserver for in memory compaction


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/28cdf4af
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/28cdf4af
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/28cdf4af

Branch: refs/heads/master
Commit: 28cdf4afb8a864c0cfccf278960a6597708f7dd6
Parents: 3a0f59d
Author: zhangduo <zh...@apache.org>
Authored: Sat Nov 4 21:45:39 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Nov 5 20:28:05 2017 +0800

----------------------------------------------------------------------
 .../example/WriteHeavyIncrementObserver.java    |  14 ++
 .../TestWriteHeavyIncrementObserver.java        |  75 +----------
 ...IncrementObserverWithMemStoreCompaction.java |  59 +++++++++
 .../WriteHeavyIncrementObserverTestBase.java    | 105 +++++++++++++++
 .../hbase/coprocessor/RegionObserver.java       |  55 +++++++-
 .../hbase/regionserver/MemStoreCompactor.java   |  14 +-
 .../MemStoreCompactorSegmentsIterator.java      | 129 +++++++++++--------
 .../regionserver/RegionCoprocessorHost.java     |  55 ++++++++
 .../hadoop/hbase/regionserver/StoreScanner.java |   4 +-
 9 files changed, 377 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
index e9b590d..55d9ac3 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
@@ -163,6 +163,20 @@ public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObs
   }
 
   @Override
+  public void preMemStoreCompactionCompactScannerOpen(
+      ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
+      throws IOException {
+    options.readAllVersions();
+  }
+
+  @Override
+  public InternalScanner preMemStoreCompactionCompact(
+      ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
+      throws IOException {
+    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
+  }
+
+  @Override
   public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
       throws IOException {
     Scan scan =

http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
index 1881c85..18e819f 100644
--- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
@@ -20,58 +20,25 @@ package org.apache.hadoop.hbase.coprocessor.example;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({ CoprocessorTests.class, MediumTests.class })
-public class TestWriteHeavyIncrementObserver {
-
-  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
-  private static TableName NAME = TableName.valueOf("TestCP");
-
-  private static byte[] FAMILY = Bytes.toBytes("cf");
-
-  private static byte[] ROW = Bytes.toBytes("row");
-
-  private static byte[] CQ1 = Bytes.toBytes("cq1");
-
-  private static byte[] CQ2 = Bytes.toBytes("cq2");
-
-  private static Table TABLE;
-
-  private static long UPPER = 1000;
-
-  private static int THREADS = 10;
+public class TestWriteHeavyIncrementObserver extends WriteHeavyIncrementObserverTestBase {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
-    UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
-    UTIL.startMiniCluster(3);
+    WriteHeavyIncrementObserverTestBase.setUp();
     UTIL.getAdmin()
         .createTable(TableDescriptorBuilder.newBuilder(NAME)
             .addCoprocessor(WriteHeavyIncrementObserver.class.getName())
@@ -79,45 +46,9 @@ public class TestWriteHeavyIncrementObserver {
     TABLE = UTIL.getConnection().getTable(NAME);
   }
 
-  @AfterClass
-  public static void tearDown() throws Exception {
-    if (TABLE != null) {
-      TABLE.close();
-    }
-    UTIL.shutdownMiniCluster();
-  }
-
-  private static void increment() throws IOException {
-    for (long i = 1; i <= UPPER; i++) {
-      TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
-      try {
-        Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10));
-      } catch (InterruptedException e) {
-      }
-    }
-  }
-
-  private void assertSum() throws IOException {
-    Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
-    assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
-    assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
-  }
-
   @Test
   public void test() throws Exception {
-    Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
-      try {
-        increment();
-      } catch (IOException e) {
-        throw new UncheckedIOException(e);
-      }
-    }, "increment-" + i)).toArray(Thread[]::new);
-    for (Thread thread : threads) {
-      thread.start();
-    }
-    for (Thread thread : threads) {
-      thread.join();
-    }
+    doIncrement(0);
     assertSum();
     // we do not hack scan operation so using scan we could get the original values added into the
     // table.

http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java
new file mode 100644
index 0000000..eeb1fa8
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.MemoryCompactionPolicy;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestWriteHeavyIncrementObserverWithMemStoreCompaction
+    extends WriteHeavyIncrementObserverTestBase {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    WriteHeavyIncrementObserverTestBase.setUp();
+    UTIL.getAdmin()
+        .createTable(TableDescriptorBuilder.newBuilder(NAME)
+            .addCoprocessor(WriteHeavyIncrementObserver.class.getName())
+            .setValue(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
+              MemoryCompactionPolicy.EAGER.name())
+            .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
+    TABLE = UTIL.getConnection().getTable(NAME);
+  }
+
+  @Test
+  public void test() throws Exception {
+    // sleep every 10 loops to give memstore compaction enough time to finish before reaching the
+    // flush size.
+    doIncrement(10);
+    assertSum();
+    HStore store = UTIL.getHBaseCluster().findRegionsForTable(NAME).get(0).getStore(FAMILY);
+    // should have no store files created as we have done aggregating all in memory
+    assertEquals(0, store.getStorefilesCount());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java
new file mode 100644
index 0000000..3583230
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class WriteHeavyIncrementObserverTestBase {
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  protected static TableName NAME = TableName.valueOf("TestCP");
+
+  protected static byte[] FAMILY = Bytes.toBytes("cf");
+
+  protected static byte[] ROW = Bytes.toBytes("row");
+
+  protected static byte[] CQ1 = Bytes.toBytes("cq1");
+
+  protected static byte[] CQ2 = Bytes.toBytes("cq2");
+
+  protected static Table TABLE;
+
+  protected static long UPPER = 1000;
+
+  protected static int THREADS = 10;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
+    UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
+    UTIL.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (TABLE != null) {
+      TABLE.close();
+    }
+    UTIL.shutdownMiniCluster();
+  }
+
+  private static void increment(int sleepSteps) throws IOException {
+    for (long i = 1; i <= UPPER; i++) {
+      TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
+      if (sleepSteps > 0 && i % sleepSteps == 0) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+  }
+
+  protected final void assertSum() throws IOException {
+    Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
+    assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
+    assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
+  }
+
+  protected final void doIncrement(int sleepSteps) throws InterruptedException {
+    Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
+      try {
+        increment(sleepSteps);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }, "increment-" + i)).toArray(Thread[]::new);
+    for (Thread thread : threads) {
+      thread.start();
+    }
+    for (Thread thread : threads) {
+      thread.join();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 9546116..83cd358 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.coprocessor;
 
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -144,10 +146,10 @@ public interface RegionObserver {
    * Called before a Store's memstore is flushed to disk.
    * @param c the environment provided by the region server
    * @param store the store where flush is being requested
-   * @param scanner the scanner over existing data used in the store file
+   * @param scanner the scanner over existing data used in the memstore
    * @param tracker tracker used to track the life cycle of a flush
-   * @return the scanner to use during compaction.  Should not be {@code null}
-   * unless the implementation is writing new store files on its own.
+   * @return the scanner to use during flush. Should not be {@code null} unless the implementation
+   *         is writing new store files on its own.
    */
   default InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
       InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
@@ -174,6 +176,51 @@ public interface RegionObserver {
       StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {}
 
   /**
+   * Called before in memory compaction started.
+   * @param c the environment provided by the region server
+   * @param store the store where in memory compaction is being requested
+   */
+  default void preMemStoreCompaction(ObserverContext<RegionCoprocessorEnvironment> c, Store store)
+      throws IOException {}
+
+  /**
+   * Called before we open store scanner for in memory compaction. You can use the {@code options}
+   * to change max versions and TTL for the scanner being opened. Notice that this method will only
+   * be called when you use {@code eager} mode. For {@code basic} mode we will not drop any cells
+   * thus we do not open a store scanner.
+   * @param c the environment provided by the region server
+   * @param store the store where in memory compaction is being requested
+   * @param options used to change max versions and TTL for the scanner being opened
+   */
+  default void preMemStoreCompactionCompactScannerOpen(
+      ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
+      throws IOException {}
+
+  /**
+   * Called before we do in memory compaction. Notice that this method will only be called when you
+   * use {@code eager} mode. For {@code basic} mode we will not drop any cells thus there is no
+   * {@link InternalScanner}.
+   * @param c the environment provided by the region server
+   * @param store the store where in memory compaction is being executed
+   * @param scanner the scanner over existing data used in the memstore segments being compact
+   * @return the scanner to use during in memory compaction. Must be non-null.
+   */
+  @NonNull
+  default InternalScanner preMemStoreCompactionCompact(
+      ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
+      throws IOException {
+    return scanner;
+  }
+
+  /**
+   * Called after the in memory compaction is finished.
+   * @param c the environment provided by the region server
+   * @param store the store where in memory compaction is being executed
+   */
+  default void postMemStoreCompaction(ObserverContext<RegionCoprocessorEnvironment> c, Store store)
+      throws IOException {}
+
+  /**
    * Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
    * available candidates. To alter the files used for compaction, you may mutate the passed in list
    * of candidates. If you remove all the candidates then the compaction will be canceled.

http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index fea9f17..4d97411 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -96,8 +96,18 @@ public class MemStoreCompactor {
       LOG.debug("Starting the In-Memory Compaction for store "
           + compactingMemStore.getStore().getColumnFamilyName());
     }
-
-    doCompaction();
+    HStore store = compactingMemStore.getStore();
+    RegionCoprocessorHost cpHost = store.getCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.preMemStoreCompaction(store);
+    }
+    try {
+      doCompaction();
+    } finally {
+      if (cpHost != null) {
+        cpHost.postMemStoreCompaction(store);
+      }
+    }
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
index 7ab2fe3..0f96936 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
@@ -23,12 +23,18 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.io.Closeables;
+
 /**
  * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
  * and performs the scan for compaction operation meaning it is based on SQM
@@ -36,12 +42,14 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator {
 
-  private List<Cell> kvs = new ArrayList<>();
-  private boolean hasMore;
+  private static final Log LOG = LogFactory.getLog(MemStoreCompactorSegmentsIterator.class);
+
+  private final List<Cell> kvs = new ArrayList<>();
+  private boolean hasMore = true;
   private Iterator<Cell> kvsIterator;
 
   // scanner on top of pipeline scanner that uses ScanQueryMatcher
-  private StoreScanner compactingScanner;
+  private InternalScanner compactingScanner;
 
   // C-tor
   public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments,
@@ -56,44 +64,34 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
     // build the scanner based on Query Matcher
     // reinitialize the compacting scanner for each instance of iterator
     compactingScanner = createScanner(store, scanners);
-
-    hasMore = compactingScanner.next(kvs, scannerContext);
-
-    if (!kvs.isEmpty()) {
-      kvsIterator = kvs.iterator();
-    }
-
+    refillKVS();
   }
 
   @Override
   public boolean hasNext() {
-    if (kvsIterator == null)  { // for the case when the result is empty
+    if (kvsIterator == null) { // for the case when the result is empty
       return false;
     }
-    if (!kvsIterator.hasNext()) {
-      // refillKVS() method should be invoked only if !kvsIterator.hasNext()
-      if (!refillKVS()) {
-        return false;
-      }
-    }
-    return kvsIterator.hasNext();
+    // return true either we have cells in buffer or we can get more.
+    return kvsIterator.hasNext() || refillKVS();
   }
 
   @Override
-  public Cell next()  {
-    if (kvsIterator == null)  { // for the case when the result is empty
-      return null;
+  public Cell next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
     }
-    if (!kvsIterator.hasNext()) {
-      // refillKVS() method should be invoked only if !kvsIterator.hasNext()
-      if (!refillKVS())  return null;
-    }
-    return (!hasMore) ? null : kvsIterator.next();
+    return kvsIterator.next();
   }
 
   public void close() {
-    compactingScanner.close();
+    try {
+      compactingScanner.close();
+    } catch (IOException e) {
+      LOG.warn("close store scanner failed", e);
+    }
     compactingScanner = null;
+    kvs.clear();
   }
 
   @Override
@@ -105,39 +103,64 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
    * Creates the scanner for compacting the pipeline.
    * @return the scanner
    */
-  private StoreScanner createScanner(HStore store, List<KeyValueScanner> scanners)
+  private InternalScanner createScanner(HStore store, List<KeyValueScanner> scanners)
       throws IOException {
-    // FIXME: This is the old comment 'Get all available versions'
-    // But actually if we really reset the ScanInfo to get all available versions then lots of UTs
-    // will fail
-    return new StoreScanner(store, store.getScanInfo(), scanners, ScanType.COMPACT_RETAIN_DELETES,
-        store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
+    InternalScanner scanner = null;
+    boolean success = false;
+    try {
+      RegionCoprocessorHost cpHost = store.getCoprocessorHost();
+      ScanInfo scanInfo;
+      if (cpHost != null) {
+        scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store);
+      } else {
+        scanInfo = store.getScanInfo();
+      }
+      scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES,
+          store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
+      if (cpHost != null) {
+        InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner);
+        if (scannerFromCp == null) {
+          throw new CoprocessorException("Got a null InternalScanner when calling" +
+              " preMemStoreCompactionCompact which is not acceptable");
+        }
+        success = true;
+        return scannerFromCp;
+      } else {
+        success = true;
+        return scanner;
+      }
+    } finally {
+      if (!success) {
+        Closeables.close(scanner, true);
+        scanners.forEach(KeyValueScanner::close);
+      }
+    }
   }
 
-  /* Refill kev-value set (should be invoked only when KVS is empty)
-   * Returns true if KVS is non-empty */
+  /*
+   * Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is
+   * non-empty
+   */
   private boolean refillKVS() {
-    kvs.clear();          // clear previous KVS, first initiated in the constructor
-    if (!hasMore) {       // if there is nothing expected next in compactingScanner
+    // if there is nothing expected next in compactingScanner
+    if (!hasMore) {
       return false;
     }
-
-    try {                 // try to get next KVS
-      hasMore = compactingScanner.next(kvs, scannerContext);
-    } catch (IOException ie) {
-      throw new IllegalStateException(ie);
-    }
-
-    if (!kvs.isEmpty() ) {// is the new KVS empty ?
-      kvsIterator = kvs.iterator();
-      return true;
-    } else {
-      // KVS is empty, but hasMore still true?
-      if (hasMore) {      // try to move to next row
-        return refillKVS();
+    // clear previous KVS, first initiated in the constructor
+    kvs.clear();
+    for (;;) {
+      try {
+        hasMore = compactingScanner.next(kvs, scannerContext);
+      } catch (IOException e) {
+        // should not happen as all data are in memory
+        throw new IllegalStateException(e);
+      }
+      if (!kvs.isEmpty()) {
+        kvsIterator = kvs.iterator();
+        return true;
+      } else if (!hasMore) {
+        return false;
       }
-
     }
-    return hasMore;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 43a01ba..7c3bf85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -777,6 +777,61 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * Invoked before in memory compaction.
+   */
+  public void preMemStoreCompaction(HStore store) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
+      @Override
+      public void call(RegionObserver observer) throws IOException {
+        observer.preMemStoreCompaction(this, store);
+      }
+    });
+  }
+
+  /**
+   * Invoked before create StoreScanner for in memory compaction.
+   */
+  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
+    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
+      @Override
+      public void call(RegionObserver observer) throws IOException {
+        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
+      }
+    });
+    return builder.build();
+  }
+
+  /**
+   * Invoked before compacting memstore.
+   */
+  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
+      throws IOException {
+    if (coprocEnvironments.isEmpty()) {
+      return scanner;
+    }
+    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
+        regionObserverGetter, scanner) {
+      @Override
+      public InternalScanner call(RegionObserver observer) throws IOException {
+        return observer.preMemStoreCompactionCompact(this, store, getResult());
+      }
+    });
+  }
+
+  /**
+   * Invoked after in memory compaction.
+   */
+  public void postMemStoreCompaction(HStore store) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
+      @Override
+      public void call(RegionObserver observer) throws IOException {
+        observer.postMemStoreCompaction(this, store);
+      }
+    });
+  }
+
+  /**
    * Invoked after a memstore flush
    * @throws IOException
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 09be65c..6abca13 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -268,9 +268,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   private static final Scan SCAN_FOR_COMPACTION = new Scan();
 
   /**
-   * Used for compactions.
+   * Used for store file compaction and memstore compaction.
    * <p>
-   * Opens a scanner across specified StoreFiles.
+   * Opens a scanner across specified StoreFiles/MemStoreSegments.
    * @param store who we scan
    * @param scanners ancillary scanners
    * @param smallestReadPoint the readPoint that we should use for tracking versions