You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2017/03/01 15:08:37 UTC

hbase git commit: HBASE-17662 Disable in-memory flush when replaying from WAL

Repository: hbase
Updated Branches:
  refs/heads/master 4a5eba5e5 -> 613bcb362


HBASE-17662 Disable in-memory flush when replaying from WAL

Signed-off-by: anoopsamjohn <an...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/613bcb36
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/613bcb36
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/613bcb36

Branch: refs/heads/master
Commit: 613bcb3622ecb1783c030f34ea2975280e1c43c1
Parents: 4a5eba5
Author: anastas <an...@yahoo-inc.com>
Authored: Wed Mar 1 10:01:30 2017 +0200
Committer: anoopsamjohn <an...@gmail.com>
Committed: Wed Mar 1 20:37:54 2017 +0530

----------------------------------------------------------------------
 .../hbase/regionserver/AbstractMemStore.java    |  5 ++-
 .../hbase/regionserver/CompactingMemStore.java  | 43 ++++++++++++++++----
 .../hbase/regionserver/DefaultMemStore.java     |  3 ++
 .../hadoop/hbase/regionserver/HRegion.java      | 21 +++++++---
 .../hadoop/hbase/regionserver/HStore.java       | 16 ++++++++
 .../hadoop/hbase/regionserver/MemStore.java     | 12 ++++++
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  6 +--
 .../regionserver/wal/AbstractTestWALReplay.java |  2 -
 8 files changed, 88 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/613bcb36/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 225dd73..6c7886f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -54,8 +54,9 @@ public abstract class AbstractMemStore implements MemStore {
   // Used to track when to flush
   private volatile long timeOfOldestEdit;
 
-  public final static long FIXED_OVERHEAD = ClassSize
-      .align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
+  public final static long FIXED_OVERHEAD = ClassSize.OBJECT
+          + (4 * ClassSize.REFERENCE)
+          + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
 
   public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/613bcb36/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index e7f4a67..312e9fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -71,16 +71,22 @@ public class CompactingMemStore extends AbstractMemStore {
 
   private long inmemoryFlushSize;       // the threshold on active size for in-memory flush
   private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
+
+  // inWalReplay is true while we are synchronously replaying the edits from WAL
+  private boolean inWalReplay = false;
+
   @VisibleForTesting
   private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
   private boolean compositeSnapshot = true;
 
-  public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD
-      + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
-                                // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction
-      + Bytes.SIZEOF_LONG // inmemoryFlushSize
+
+  public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
+      + 6 * ClassSize.REFERENCE     // Store, RegionServicesForStores, CompactionPipeline,
+                                    // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction
+      + Bytes.SIZEOF_LONG           // inmemoryFlushSize
+      + 2 * Bytes.SIZEOF_BOOLEAN    // compositeSnapshot and inWalReplay
       + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
-      + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD;
+      + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
 
   public CompactingMemStore(Configuration conf, CellComparator c,
       HStore store, RegionServicesForStores regionServices,
@@ -232,6 +238,24 @@ public class CompactingMemStore extends AbstractMemStore {
     }
   }
 
+  /**
+   * This message intends to inform the MemStore that next coming updates
+   * are going to be part of the replaying edits from WAL
+   */
+  @Override
+  public void startReplayingFromWAL() {
+    inWalReplay = true;
+  }
+
+  /**
+   * This message intends to inform the MemStore that the replaying edits from WAL
+   * are done
+   */
+  @Override
+  public void stopReplayingFromWAL() {
+    inWalReplay = false;
+  }
+
   // the getSegments() method is used for tests only
   @VisibleForTesting
   @Override
@@ -388,9 +412,12 @@ public class CompactingMemStore extends AbstractMemStore {
 
   private boolean shouldFlushInMemory() {
     if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold
-        // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude
-        // the insert of the active into the compaction pipeline
-        return (inMemoryFlushInProgress.compareAndSet(false,true));
+      if (inWalReplay) {  // when replaying edits from WAL there is no need in in-memory flush
+        return false;     // regardless the size
+      }
+      // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude
+      // the insert of the active into the compaction pipeline
+      return (inMemoryFlushInProgress.compareAndSet(false,true));
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/613bcb36/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 63af570..a31c2c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
@@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 public class DefaultMemStore extends AbstractMemStore {
   private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
 
+  public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD);
+  public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD);
   /**
    * Default constructor. Used for tests.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/613bcb36/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a4dc974..cc32179 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -878,11 +878,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     long maxSeqId = initializeStores(reporter, status);
     this.mvcc.advanceTo(maxSeqId);
     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
-      // Recover any edits if available.
-      maxSeqId = Math.max(maxSeqId,
-        replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
-      // Make sure mvcc is up to max.
-      this.mvcc.advanceTo(maxSeqId);
+      List<Store> stores = this.getStores();  // update the stores that we are replaying
+      try {
+        for (Store store : stores) {
+          ((HStore) store).startReplayingFromWAL();
+        }
+        // Recover any edits if available.
+        maxSeqId = Math.max(maxSeqId,
+            replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+        // Make sure mvcc is up to max.
+        this.mvcc.advanceTo(maxSeqId);
+      } finally {
+        for (Store store : stores) {            // update the stores that we are done replaying
+          ((HStore)store).stopReplayingFromWAL();
+        }
+      }
+
     }
     this.lastReplayedOpenRegionSeqId = maxSeqId;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/613bcb36/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 4d2fea8..b74e635 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -663,6 +663,22 @@ public class HStore implements Store {
   }
 
   /**
+   * This message intends to inform the MemStore that next coming updates
+   * are going to be part of the replaying edits from WAL
+   */
+  public void startReplayingFromWAL(){
+    this.memstore.startReplayingFromWAL();
+  }
+
+  /**
+   * This message intends to inform the MemStore that the replaying edits from WAL
+   * are done
+   */
+  public void stopReplayingFromWAL(){
+    this.memstore.stopReplayingFromWAL();
+  }
+
+  /**
    * Adds a value to the memstore
    * @param cell
    * @param memstoreSize

http://git-wip-us.apache.org/repos/asf/hbase/blob/613bcb36/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 38d3e44..8b0ad19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -127,4 +127,16 @@ public interface MemStore {
 
   /* Return true if the memstore may use some extra memory space*/
   boolean isSloppy();
+
+  /**
+   * This message intends to inform the MemStore that next coming updates
+   * are going to be part of the replaying edits from WAL
+   */
+  default void startReplayingFromWAL(){return;}
+
+  /**
+   * This message intends to inform the MemStore that the replaying edits from WAL
+   * are done
+   */
+  default void stopReplayingFromWAL(){return;}
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/613bcb36/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index ceaadbe..2f33859 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -324,10 +324,10 @@ public class TestHeapSize  {
     expected += ClassSize.estimateBase(AtomicBoolean.class, false);
     expected += ClassSize.estimateBase(AtomicBoolean.class, false);
     expected += ClassSize.estimateBase(CompactionPipeline.class, false);
-    expected += ClassSize.estimateBase(LinkedList.class, false);
-    expected += ClassSize.estimateBase(LinkedList.class, false);
+    expected += ClassSize.estimateBase(LinkedList.class, false); //inside CompactionPipeline
+    expected += ClassSize.estimateBase(LinkedList.class, false); //inside CompactionPipeline
     expected += ClassSize.estimateBase(MemStoreCompactor.class, false);
-    expected += ClassSize.estimateBase(AtomicBoolean.class, false);
+    expected += ClassSize.estimateBase(AtomicBoolean.class, false);// inside MemStoreCompactor
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicBoolean.class, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/613bcb36/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 9ac07d7..90eacf0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -127,8 +127,6 @@ public abstract class AbstractTestWALReplay {
     Configuration conf = TEST_UTIL.getConfiguration();
     // The below config supported by 0.20-append and CDH3b2
     conf.setInt("dfs.client.block.recovery.retries", 2);
-    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
-        String.valueOf(MemoryCompactionPolicy.NONE));
     TEST_UTIL.startMiniCluster(3);
     Path hbaseRootDir =
       TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));