You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by al...@apache.org on 2018/08/22 14:54:34 UTC

hbase git commit: HBASE-21041 Memstore's heap size will be decreased to minus zero after flush

Repository: hbase
Updated Branches:
  refs/heads/branch-2.0 e72848a56 -> e4c2d9fff


HBASE-21041 Memstore's heap size will be decreased to minus zero after flush


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e4c2d9ff
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e4c2d9ff
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e4c2d9ff

Branch: refs/heads/branch-2.0
Commit: e4c2d9ffffab6c80c033ce26122fad6907ac5fc9
Parents: e72848a
Author: Allan Yang <al...@apache.org>
Authored: Wed Aug 22 22:50:26 2018 +0800
Committer: Allan Yang <al...@apache.org>
Committed: Wed Aug 22 22:50:26 2018 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/AbstractMemStore.java    | 19 +++++++--
 .../regionserver/CSLMImmutableSegment.java      |  5 ++-
 .../hbase/regionserver/CompactingMemStore.java  |  5 +--
 .../hbase/regionserver/CompactionPipeline.java  |  9 +++-
 .../hbase/regionserver/DefaultMemStore.java     | 23 ++++++++--
 .../hadoop/hbase/regionserver/HStore.java       |  3 +-
 .../hbase/regionserver/MutableSegment.java      |  6 ++-
 .../hbase/regionserver/SegmentFactory.java      | 28 ++++++++-----
 .../hadoop/hbase/regionserver/TestHRegion.java  | 30 +++++++++++++
 .../TestHRegionWithInMemoryFlush.java           | 44 ++++++++++++++++++++
 10 files changed, 149 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e4c2d9ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index ce6d8ad..2e25ee9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -54,8 +54,10 @@ public abstract class AbstractMemStore implements MemStore {
   // Used to track when to flush
   private volatile long timeOfOldestEdit;
 
+  protected RegionServicesForStores regionServices;
+
   public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
-          + (4 * ClassSize.REFERENCE)
+          + (5 * ClassSize.REFERENCE)
           + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
 
   public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
@@ -74,17 +76,28 @@ public abstract class AbstractMemStore implements MemStore {
     return order - 1;
   }
 
-  protected AbstractMemStore(final Configuration conf, final CellComparator c) {
+  protected AbstractMemStore(final Configuration conf, final CellComparator c,
+      final RegionServicesForStores regionServices) {
     this.conf = conf;
     this.comparator = c;
+    this.regionServices = regionServices;
     resetActive();
     this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
     this.snapshotId = NO_SNAPSHOT_ID;
   }
 
   protected void resetActive() {
+    // Record the MutableSegment' heap overhead when initialing
+    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
     // Reset heap to not include any keys
-    this.active = SegmentFactory.instance().createMutableSegment(conf, comparator);
+    this.active = SegmentFactory.instance()
+        .createMutableSegment(conf, comparator, memstoreAccounting);
+    // regionServices can be null when testing
+    if (regionServices != null) {
+      regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
+          memstoreAccounting.getHeapSize(),
+          memstoreAccounting.getOffHeapSize());
+    }
     this.timeOfOldestEdit = Long.MAX_VALUE;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4c2d9ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
index 9e206ea..855fd08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
@@ -36,11 +36,14 @@ public class CSLMImmutableSegment extends ImmutableSegment {
    * This C-tor should be used when active MutableSegment is pushed into the compaction
    * pipeline and becomes an ImmutableSegment.
    */
-  protected CSLMImmutableSegment(Segment segment) {
+  protected CSLMImmutableSegment(Segment segment, MemStoreSizing memstoreSizing) {
     super(segment);
     // update the segment metadata heap size
     long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM;
     incMemStoreSize(0, indexOverhead, 0); // CSLM is always on-heap
+    if (memstoreSizing != null) {
+      memstoreSizing.incMemStoreSize(0, indexOverhead, 0);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4c2d9ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 459eeda..c9fc9a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -66,7 +66,6 @@ public class CompactingMemStore extends AbstractMemStore {
 
   private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
   private HStore store;
-  private RegionServicesForStores regionServices;
   private CompactionPipeline pipeline;
   protected MemStoreCompactor compactor;
 
@@ -93,7 +92,7 @@ public class CompactingMemStore extends AbstractMemStore {
   private IndexType indexType = IndexType.ARRAY_MAP;  // default implementation
 
   public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
-      + 7 * ClassSize.REFERENCE     // Store, RegionServicesForStores, CompactionPipeline,
+      + 6 * ClassSize.REFERENCE     // Store, CompactionPipeline,
                                     // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction,
                                     // indexType
       + Bytes.SIZEOF_LONG           // inmemoryFlushSize
@@ -104,7 +103,7 @@ public class CompactingMemStore extends AbstractMemStore {
   public CompactingMemStore(Configuration conf, CellComparator c,
       HStore store, RegionServicesForStores regionServices,
       MemoryCompactionPolicy compactionPolicy) throws IOException {
-    super(conf, c);
+    super(conf, c, regionServices);
     this.store = store;
     this.regionServices = regionServices;
     this.pipeline = new CompactionPipeline(getRegionServices());

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4c2d9ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 8073a61..5afaec1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -73,8 +73,15 @@ public class CompactionPipeline {
   }
 
   public boolean pushHead(MutableSegment segment) {
+    // Record the ImmutableSegment' heap overhead when initialing
+    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
     ImmutableSegment immutableSegment = SegmentFactory.instance().
-        createImmutableSegment(segment);
+        createImmutableSegment(segment, memstoreAccounting);
+    if (region != null) {
+      region.addMemStoreSize(memstoreAccounting.getDataSize(),
+          memstoreAccounting.getHeapSize(),
+          memstoreAccounting.getOffHeapSize());
+    }
     synchronized (pipeline){
       boolean res = addFirst(immutableSegment);
       readOnlyCopy = new LinkedList<>(pipeline);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4c2d9ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index f4db666..422f1b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -63,7 +63,7 @@ public class DefaultMemStore extends AbstractMemStore {
    * Default constructor. Used for tests.
    */
   public DefaultMemStore() {
-    this(HBaseConfiguration.create(), CellComparator.getInstance());
+    this(HBaseConfiguration.create(), CellComparator.getInstance(), null);
   }
 
   /**
@@ -71,7 +71,16 @@ public class DefaultMemStore extends AbstractMemStore {
    * @param c Comparator
    */
   public DefaultMemStore(final Configuration conf, final CellComparator c) {
-    super(conf, c);
+    super(conf, c, null);
+  }
+
+  /**
+   * Constructor.
+   * @param c Comparator
+   */
+  public DefaultMemStore(final Configuration conf, final CellComparator c,
+      final RegionServicesForStores regionServices) {
+    super(conf, c, regionServices);
   }
 
   /**
@@ -88,8 +97,16 @@ public class DefaultMemStore extends AbstractMemStore {
     } else {
       this.snapshotId = EnvironmentEdgeManager.currentTime();
       if (!this.active.isEmpty()) {
+        // Record the ImmutableSegment' heap overhead when initialing
+        MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
         ImmutableSegment immutableSegment = SegmentFactory.instance().
-            createImmutableSegment(this.active);
+            createImmutableSegment(this.active, memstoreAccounting);
+        // regionServices can be null when testing
+        if (regionServices != null) {
+          regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
+              memstoreAccounting.getHeapSize(),
+              memstoreAccounting.getOffHeapSize());
+        }
         this.snapshot = immutableSegment;
         resetActive();
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4c2d9ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 7adeb85..dad9346 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -331,7 +331,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
     switch (inMemoryCompaction) {
       case NONE:
         ms = ReflectionUtils.newInstance(DefaultMemStore.class,
-            new Object[]{conf, this.comparator});
+            new Object[] { conf, this.comparator,
+                this.getHRegion().getRegionServicesForStores()});
         break;
       default:
         Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4c2d9ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
index c72d385..714e9bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
@@ -42,9 +42,13 @@ public class MutableSegment extends Segment {
         + ClassSize.CONCURRENT_SKIPLISTMAP
         + ClassSize.SYNC_TIMERANGE_TRACKER;
 
-  protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
+  protected MutableSegment(CellSet cellSet, CellComparator comparator,
+      MemStoreLAB memStoreLAB, MemStoreSizing memstoreSizing) {
     super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
     incMemStoreSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata
+    if (memstoreSizing != null) {
+      memstoreSizing.incMemStoreSize(0, DEEP_OVERHEAD, 0);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4c2d9ff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index db0b319..26b7ecc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -60,22 +60,30 @@ public final class SegmentFactory {
             conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType);
   }
 
-  // create empty immutable segment
-  // for initializations
+  /**
+   * create empty immutable segment for initializations
+   * This ImmutableSegment is used as a place holder for snapshot in Memstore.
+   * It won't flush later, So it is not necessary to record the initial size
+   * for it.
+   * @param comparator comparator
+   * @return ImmutableSegment
+   */
   public ImmutableSegment createImmutableSegment(CellComparator comparator) {
-    MutableSegment segment = generateMutableSegment(null, comparator, null);
-    return createImmutableSegment(segment);
+    MutableSegment segment = generateMutableSegment(null, comparator, null, null);
+    return createImmutableSegment(segment, null);
   }
 
   // create not-flat immutable segment from mutable segment
-  public ImmutableSegment createImmutableSegment(MutableSegment segment) {
-    return new CSLMImmutableSegment(segment);
+  public ImmutableSegment createImmutableSegment(MutableSegment segment,
+      MemStoreSizing memstoreSizing) {
+    return new CSLMImmutableSegment(segment, memstoreSizing);
   }
 
   // create mutable segment
-  public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) {
+  public MutableSegment createMutableSegment(final Configuration conf,
+      CellComparator comparator, MemStoreSizing memstoreSizing) {
     MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
-    return generateMutableSegment(conf, comparator, memStoreLAB);
+    return generateMutableSegment(conf, comparator, memStoreLAB, memstoreSizing);
   }
 
   // create new flat immutable segment from merging old immutable segments
@@ -135,10 +143,10 @@ public final class SegmentFactory {
   }
 
   private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator,
-      MemStoreLAB memStoreLAB) {
+      MemStoreLAB memStoreLAB, MemStoreSizing memstoreSizing) {
     // TBD use configuration to set type of segment
     CellSet set = new CellSet(comparator);
-    return new MutableSegment(set, comparator, memStoreLAB);
+    return new MutableSegment(set, comparator, memStoreLAB, memstoreSizing);
   }
 
   private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4c2d9ff/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 7502a67..51d4e66 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -426,6 +426,36 @@ public class TestHRegion {
   }
 
   /**
+   * A test case of HBASE-21041
+   * @throws Exception Exception
+   */
+  @Test
+  public void testFlushAndMemstoreSizeCounting() throws Exception {
+    byte[] family = Bytes.toBytes("family");
+    this.region = initHRegion(tableName, method, CONF, family);
+    final WALFactory wals = new WALFactory(CONF, method);
+    try {
+      for (byte[] row : HBaseTestingUtility.ROWS) {
+        Put put = new Put(row);
+        put.addColumn(family, family, row);
+        region.put(put);
+      }
+      region.flush(true);
+      // After flush, data size should be zero
+      assertEquals(0, region.getMemStoreDataSize());
+      // After flush, a new active mutable segment is created, so the heap size
+      // should equal to MutableSegment.DEEP_OVERHEAD
+      assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
+      // After flush, offheap should be zero
+      assertEquals(0, region.getMemStoreOffHeapSize());
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(this.region);
+      this.region = null;
+      wals.close();
+    }
+  }
+
+  /**
    * Test we do not lose data if we fail a flush and then close.
    * Part of HBase-10466.  Tests the following from the issue description:
    * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4c2d9ff/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
index ce83326..26b109a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
@@ -22,14 +22,20 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.Assert;
 import org.junit.ClassRule;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * A test similar to TestHRegion, but with in-memory flush families.
  * Also checks wal truncation after in-memory compaction.
@@ -60,5 +66,43 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion {
     return TEST_UTIL.createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey,
         isReadOnly, durability, wal, inMemory, families);
   }
+
+  /**
+   * A test case of HBASE-21041
+   * @throws Exception Exception
+   */
+  @Override
+  @Test
+  public void testFlushAndMemstoreSizeCounting() throws Exception {
+    byte[] family = Bytes.toBytes("family");
+    this.region = initHRegion(tableName, method, CONF, family);
+    final WALFactory wals = new WALFactory(CONF, method);
+    int count = 0;
+    try {
+      for (byte[] row : HBaseTestingUtility.ROWS) {
+        Put put = new Put(row);
+        put.addColumn(family, family, row);
+        region.put(put);
+        //In memory flush every 1000 puts
+        if (count++ % 1000 == 0) {
+          ((CompactingMemStore) (region.getStore(family).memstore))
+              .flushInMemory();
+        }
+      }
+      region.flush(true);
+      // After flush, data size should be zero
+      Assert.assertEquals(0, region.getMemStoreDataSize());
+      // After flush, a new active mutable segment is created, so the heap size
+      // should equal to MutableSegment.DEEP_OVERHEAD
+      Assert.assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
+      // After flush, offheap size should be zero
+      Assert.assertEquals(0, region.getMemStoreOffHeapSize());
+
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(this.region);
+      this.region = null;
+      wals.close();
+    }
+  }
 }