You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/07/06 13:24:57 UTC

hbase git commit: HBASE-16162 Compacting Memstore : unnecessary push of active segments to pipeline.

Repository: hbase
Updated Branches:
  refs/heads/master ae92668dd -> 581d2b7de


HBASE-16162 Compacting Memstore : unnecessary push of active segments to pipeline.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/581d2b7d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/581d2b7d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/581d2b7d

Branch: refs/heads/master
Commit: 581d2b7de517ee29b81b62c521ef5ca27c41f38d
Parents: ae92668
Author: anoopsjohn <an...@gmail.com>
Authored: Wed Jul 6 18:54:35 2016 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Wed Jul 6 18:54:35 2016 +0530

----------------------------------------------------------------------
 .../hbase/regionserver/CompactingMemStore.java  | 64 +++++++++++---------
 .../hbase/regionserver/MemStoreCompactor.java   |  1 -
 2 files changed, 35 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/581d2b7d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 5e286de..e27acce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -67,7 +67,6 @@ public class CompactingMemStore extends AbstractMemStore {
   // the threshold on active size for in-memory flush
   private long inmemoryFlushSize;
   private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
-  @VisibleForTesting
   private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
 
   public CompactingMemStore(Configuration conf, CellComparator c,
@@ -199,10 +198,6 @@ public class CompactingMemStore extends AbstractMemStore {
     return list;
   }
 
-  public void setInMemoryFlushInProgress(boolean inMemoryFlushInProgress) {
-    this.inMemoryFlushInProgress.set(inMemoryFlushInProgress);
-  }
-
   public boolean swapCompactedSegments(VersionedSegmentsList versionedList,
       ImmutableSegment result) {
     return pipeline.swap(versionedList, result);
@@ -275,31 +270,38 @@ public class CompactingMemStore extends AbstractMemStore {
   // otherwise there is a deadlock
   @VisibleForTesting
   void flushInMemory() throws IOException {
-    // Phase I: Update the pipeline
-    getRegionServices().blockUpdates();
+    // setting the inMemoryFlushInProgress flag again for the case this method is invoked
+    // directly (only in tests) in the common path setting from true to true is idempotent
+    // Speculative compaction execution, may be interrupted if flush is forced while
+    // compaction is in progress
+    inMemoryFlushInProgress.set(true);
     try {
-      MutableSegment active = getActive();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, "
-            + "and initiating compaction.");
+      // Phase I: Update the pipeline
+      getRegionServices().blockUpdates();
+      try {
+        MutableSegment active = getActive();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, "
+              + "and initiating compaction.");
+        }
+        pushActiveToPipeline(active);
+      } finally {
+        getRegionServices().unblockUpdates();
       }
-      pushActiveToPipeline(active);
-    } finally {
-      getRegionServices().unblockUpdates();
-    }
-    // Phase II: Compact the pipeline
-    try {
-      if (allowCompaction.get() && inMemoryFlushInProgress.compareAndSet(false, true)) {
-        // setting the inMemoryFlushInProgress flag again for the case this method is invoked
-        // directly (only in tests) in the common path setting from true to true is idempotent
-        // Speculative compaction execution, may be interrupted if flush is forced while
-        // compaction is in progress
+      // Used by tests
+      if (!allowCompaction.get()) {
+        return;
+      }
+      // Phase II: Compact the pipeline
+      try {
         compactor.startCompaction();
+      } catch (IOException e) {
+        LOG.warn("Unable to run memstore compaction. region "
+            + getRegionServices().getRegionInfo().getRegionNameAsString() + "store: "
+            + getFamilyName(), e);
       }
-    } catch (IOException e) {
-      LOG.warn("Unable to run memstore compaction. region "
-          + getRegionServices().getRegionInfo().getRegionNameAsString()
-          + "store: "+ getFamilyName(), e);
+    } finally {
+      inMemoryFlushInProgress.set(false);
     }
   }
 
@@ -312,9 +314,9 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   private boolean shouldFlushInMemory() {
-    if(getActive().getSize() > inmemoryFlushSize) {
+    if (getActive().getSize() > inmemoryFlushSize) {
       // size above flush threshold
-      return (allowCompaction.get() && !inMemoryFlushInProgress.get());
+      return inMemoryFlushInProgress.compareAndSet(false, true);
     }
     return false;
   }
@@ -361,7 +363,8 @@ public class CompactingMemStore extends AbstractMemStore {
   */
   private class InMemoryFlushRunnable implements Runnable {
 
-    @Override public void run() {
+    @Override
+    public void run() {
       try {
         flushInMemory();
       } catch (IOException e) {
@@ -375,14 +378,17 @@ public class CompactingMemStore extends AbstractMemStore {
   //----------------------------------------------------------------------
   //methods for tests
   //----------------------------------------------------------------------
+  @VisibleForTesting
   boolean isMemStoreFlushingInMemory() {
     return inMemoryFlushInProgress.get();
   }
 
+  @VisibleForTesting
   void disableCompaction() {
     allowCompaction.set(false);
   }
 
+  @VisibleForTesting
   void enableCompaction() {
     allowCompaction.set(true);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/581d2b7d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index d86cd32..65d3af6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -153,7 +153,6 @@ class MemStoreCompactor {
       return;
     } finally {
       releaseResources();
-      compactingMemStore.setInMemoryFlushInProgress(false);
     }
 
   }