You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by al...@apache.org on 2018/08/22 14:09:35 UTC

hbase git commit: HBASE-21031 Memory leak if replay edits failed during region opening

Repository: hbase
Updated Branches:
  refs/heads/branch-2.0 dcf8a2318 -> e72848a56


HBASE-21031 Memory leak if replay edits failed during region opening


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e72848a5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e72848a5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e72848a5

Branch: refs/heads/branch-2.0
Commit: e72848a56afb9b563194cda18c21f8f158b5e899
Parents: dcf8a23
Author: Allan Yang <al...@apache.org>
Authored: Wed Aug 22 22:08:33 2018 +0800
Committer: Allan Yang <al...@apache.org>
Committed: Wed Aug 22 22:08:33 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  49 ++++-
 .../hadoop/hbase/regionserver/HStore.java       |   4 +
 .../regionserver/RegionServerAccounting.java    |  52 -----
 .../regionserver/handler/OpenRegionHandler.java |  12 +-
 .../TestRecoveredEditsReplayAndAbort.java       | 214 +++++++++++++++++++
 5 files changed, 259 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e72848a5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ba75a2e..d640b54 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -857,7 +857,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return What the next sequence (edit) id should be.
    * @throws IOException e
    */
-  private long initialize(final CancelableProgressable reporter) throws IOException {
+  @VisibleForTesting
+  long initialize(final CancelableProgressable reporter) throws IOException {
 
     //Refuse to open the region if there is no column family in the table
     if (htableDescriptor.getColumnFamilyCount() == 0) {
@@ -870,6 +871,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     try {
       nextSeqId = initializeRegionInternals(reporter, status);
       return nextSeqId;
+    } catch (IOException e) {
+      LOG.warn("Failed initialize of region= {}, starting to roll back memstore",
+          getRegionInfo().getRegionNameAsString(), e);
+      // global memstore size will be decreased when dropping memstore
+      try {
+        //drop the memory used by memstore if open region fails
+        dropMemStoreContents();
+      } catch (IOException ioE) {
+        if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
+          LOG.warn("Failed drop memstore of region= {}, "
+                  + "some chunks may not released forever since MSLAB is enabled",
+              getRegionInfo().getRegionNameAsString());
+        }
+
+      }
+      throw e;
     } finally {
       // nextSeqid will be -1 if the initialization fails.
       // At least it will be 0 otherwise.
@@ -4425,11 +4442,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
       }
     }
-    // The edits size added into rsAccounting during this replaying will not
-    // be required any more. So just clear it.
-    if (this.rsAccounting != null) {
-      this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName());
-    }
     if (seqid > minSeqIdForTheRegion) {
       // Then we added some edits to memory. Flush and cleanup split edit files.
       internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
@@ -4611,9 +4623,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             editsCount++;
           }
           MemStoreSize mss = memStoreSizing.getMemStoreSize();
-          if (this.rsAccounting != null) {
-            rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), mss);
-          }
           incMemStoreSize(mss);
           flush = isFlushSize(this.memStoreSizing.getMemStoreSize());
           if (flush) {
@@ -5056,6 +5065,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /**
+   * Be careful, this method will drop all data in the memstore of this region.
+   * Currently, this method is used to drop memstore to prevent memory leak
+   * when replaying recovered.edits while opening region.
+   */
+  public MemStoreSize dropMemStoreContents() throws IOException {
+    MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing();
+    this.updatesLock.writeLock().lock();
+    try {
+      for (HStore s : stores.values()) {
+        MemStoreSize memStoreSize = doDropStoreMemStoreContentsForSeqId(s, HConstants.NO_SEQNUM);
+        LOG.info("Drop memstore for Store " + s.getColumnFamilyName() + " in region "
+                + this.getRegionInfo().getRegionNameAsString()
+                + " , dropped memstoresize: [" + memStoreSize + " }");
+        totalFreedSize.incMemStoreSize(memStoreSize);
+      }
+      return totalFreedSize.getMemStoreSize();
+    } finally {
+      this.updatesLock.writeLock().unlock();
+    }
+  }
+
+  /**
    * Drops the memstore contents after replaying a flush descriptor or region open event replay
    * if the memstore edits have seqNums smaller than the given seq id
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hbase/blob/e72848a5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 3943de1..7adeb85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2321,6 +2321,10 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
     @Override
     public void abort() throws IOException {
       if (snapshot != null) {
+        //We need to close the snapshot when aborting, otherwise, the segment scanner
+        //won't be closed. If we are using MSLAB, the chunk referenced by those scanners
+        //can't be released, thus memory leak
+        snapshot.close();
         HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e72848a5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
index 4e66fc7..baa9a6a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
@@ -19,14 +19,11 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.lang.management.MemoryType;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 
 /**
@@ -43,11 +40,6 @@ public class RegionServerAccounting {
   // memstore off-heap size.
   private final LongAdder globalMemStoreOffHeapSize = new LongAdder();
 
-  // Store the edits size during replaying WAL. Use this to roll back the
-  // global memstore size once a region opening failed.
-  private final ConcurrentMap<byte[], MemStoreSizing> replayEditsPerRegion =
-    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
-
   private long globalMemStoreLimit;
   private final float globalMemStoreLimitLowMarkPercent;
   private long globalMemStoreLimitLowMark;
@@ -216,48 +208,4 @@ public class RegionServerAccounting {
           getGlobalMemStoreHeapSize() * 1.0 / globalOnHeapMemstoreLimitLowMark);
     }
   }
-
-  /***
-   * Add memStoreSize to replayEditsPerRegion.
-   *
-   * @param regionName region name.
-   * @param memStoreSize the Memstore size will be added to replayEditsPerRegion.
-   */
-  public void addRegionReplayEditsSize(byte[] regionName, MemStoreSize memStoreSize) {
-    MemStoreSizing replayEdistsSize = replayEditsPerRegion.get(regionName);
-    // All ops on the same MemStoreSize object is going to be done by single thread, sequentially
-    // only. First calls to this method to increment the per region reply edits size and then call
-    // to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of
-    // the region open operation. No need to handle multi thread issues on one region's entry in
-    // this Map.
-    if (replayEdistsSize == null) {
-      replayEdistsSize = new ThreadSafeMemStoreSizing();
-      replayEditsPerRegion.put(regionName, replayEdistsSize);
-    }
-    replayEdistsSize.incMemStoreSize(memStoreSize);
-  }
-
-  /**
-   * Roll back the global MemStore size for a specified region when this region
-   * can't be opened.
-   *
-   * @param regionName the region which could not open.
-   */
-  public void rollbackRegionReplayEditsSize(byte[] regionName) {
-    MemStoreSizing replayEditsSizing = replayEditsPerRegion.get(regionName);
-    if (replayEditsSizing != null) {
-      clearRegionReplayEditsSize(regionName);
-      decGlobalMemStoreSize(replayEditsSizing.getDataSize(), replayEditsSizing.getHeapSize(),
-          replayEditsSizing.getOffHeapSize());
-    }
-  }
-
-  /**
-   * Clear a region from replayEditsPerRegion.
-   *
-   * @param regionName region name.
-   */
-  public void clearRegionReplayEditsSize(byte[] regionName) {
-    replayEditsPerRegion.remove(regionName);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e72848a5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
index f408629..970911f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
@@ -300,16 +299,7 @@ public class OpenRegionHandler extends EventHandler {
       // and transition the node back to FAILED_OPEN. If that fails,
       // we rely on the Timeout Monitor in the master to reassign.
       LOG.error(
-          "Failed open of region=" + this.regionInfo.getRegionNameAsString()
-              + ", starting to roll back the global memstore size.", t);
-      // Decrease the global memstore size.
-      if (this.rsServices != null) {
-        RegionServerAccounting rsAccounting =
-          this.rsServices.getRegionServerAccounting();
-        if (rsAccounting != null) {
-          rsAccounting.rollbackRegionReplayEditsSize(this.regionInfo.getRegionName());
-        }
-      }
+          "Failed open of region=" + this.regionInfo.getRegionNameAsString(), t);
     }
     return region;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e72848a5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
new file mode 100644
index 0000000..b0cbd58
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
@@ -0,0 +1,214 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HBASE-21031
+ * If replay edits fails, we need to make sure memstore is rollbacked
+ * And if MSLAB is used, all chunk is released too.
+ */
+@Category({RegionServerTests.class, SmallTests.class })
+public class TestRecoveredEditsReplayAndAbort {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRecoveredEditsReplayAndAbort.class);
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestRecoveredEditsReplayAndAbort.class);
+
+  protected final byte[] row = Bytes.toBytes("rowA");
+
+  protected final static byte [] fam1 = Bytes.toBytes("colfamily11");
+
+  @Rule
+  public TestName name = new TestName();
+
+  // Test names
+  protected TableName tableName;
+  protected String method;
+
+  protected static HBaseTestingUtility TEST_UTIL;
+  public static Configuration CONF ;
+  private static FileSystem FILESYSTEM;
+  private HRegion region = null;
+
+  private final Random random = new Random();
+
+  @Before
+  public void setup() throws IOException {
+    TEST_UTIL = new HBaseTestingUtility();
+    FILESYSTEM = TEST_UTIL.getTestFileSystem();
+    CONF = TEST_UTIL.getConfiguration();
+    method = name.getMethodName();
+    tableName = TableName.valueOf(method);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  @Test
+  public void test() throws Exception {
+    //set flush size to 10MB
+    CONF.setInt("hbase.hregion.memstore.flush.size", 1024 * 1024 * 10);
+    //set the report interval to a very small value
+    CONF.setInt("hbase.hstore.report.interval.edits", 1);
+    CONF.setInt("hbase.hstore.report.period", 0);
+    //mock a RegionServerServices
+    final RegionServerAccounting rsAccounting = new RegionServerAccounting(CONF);
+    RegionServerServices rs = Mockito.mock(RegionServerServices.class);
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
+    Mockito.when(rs.getRegionServerAccounting()).thenReturn(rsAccounting);
+    Mockito.when(rs.isAborted()).thenReturn(false);
+    Mockito.when(rs.getNonceManager()).thenReturn(null);
+    Mockito.when(rs.getServerName()).thenReturn(ServerName
+        .valueOf("test", 0, 111));
+    //create a region
+    TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort");
+    TableDescriptor htd = TableDescriptorBuilder.newBuilder(testTable)
+        .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam1).build())
+        .build();
+    HRegionInfo info = new HRegionInfo(htd.getTableName(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, false);
+    Path logDir = TEST_UTIL
+        .getDataTestDirOnTestFS("TestRecoveredEidtsReplayAndAbort.log");
+    final WAL wal = HBaseTestingUtility.createWal(CONF, logDir, info);
+    Path rootDir = TEST_UTIL.getDataTestDir();
+    Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
+    HRegionFileSystem
+        .createRegionOnFileSystem(CONF, TEST_UTIL.getTestFileSystem(), tableDir, info);
+    region = HRegion.newHRegion(tableDir, wal, TEST_UTIL.getTestFileSystem(), CONF, info,
+        htd, rs);
+    //create some recovered.edits
+    final WALFactory wals = new WALFactory(CONF, method);
+    try {
+      Path regiondir = region.getRegionFileSystem().getRegionDir();
+      FileSystem fs = region.getRegionFileSystem().getFileSystem();
+      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
+
+      Path recoveredEditsDir = WALSplitter
+          .getRegionDirRecoveredEditsDir(regiondir);
+      long maxSeqId = 1200;
+      long minSeqId = 1000;
+      long totalEdits = maxSeqId - minSeqId;
+      for (long i = minSeqId; i <= maxSeqId; i += 100) {
+        Path recoveredEdits = new Path(recoveredEditsDir,
+            String.format("%019d", i));
+        LOG.info("Begin to write recovered.edits : " + recoveredEdits);
+        fs.create(recoveredEdits);
+        WALProvider.Writer writer = wals
+            .createRecoveredEditsWriter(fs, recoveredEdits);
+        for (long j = i; j < i + 100; j++) {
+          long time = System.nanoTime();
+          WALEdit edit = new WALEdit();
+          // 200KB kv
+          byte[] value = new byte[200 * 1024];
+          random.nextBytes(value);
+          edit.add(
+              new KeyValue(row, fam1, Bytes.toBytes(j), time, KeyValue.Type.Put,
+                  value));
+          writer.append(new WAL.Entry(
+              new WALKeyImpl(regionName, tableName, j, time,
+                  HConstants.DEFAULT_CLUSTER_ID), edit));
+        }
+        writer.close();
+      }
+      MonitoredTask status = TaskMonitor.get().createStatus(method);
+      //try to replay the edits
+      try {
+        region.initialize(new CancelableProgressable() {
+          private long replayedEdits = 0;
+
+          @Override
+          public boolean progress() {
+            replayedEdits++;
+            //during replay, rsAccounting should align with global memstore, because
+            //there is only one memstore here
+            Assert.assertEquals(rsAccounting.getGlobalMemStoreDataSize(),
+                region.getMemStoreDataSize());
+            Assert.assertEquals(rsAccounting.getGlobalMemStoreHeapSize(),
+                region.getMemStoreHeapSize());
+            Assert.assertEquals(rsAccounting.getGlobalMemStoreOffHeapSize(),
+                region.getMemStoreOffHeapSize());
+            // abort the replay before finishing, leaving some edits in the memory
+            return replayedEdits < totalEdits - 10;
+          }
+        });
+        Assert.fail("Should not reach here");
+      } catch (IOException t) {
+        LOG.info("Current memstore: " + region.getMemStoreDataSize() + ", " + region
+            .getMemStoreHeapSize() + ", " + region
+            .getMemStoreOffHeapSize());
+      }
+      //After aborting replay, there should be no data in the memory
+      Assert.assertEquals(0, rsAccounting.getGlobalMemStoreDataSize());
+      Assert.assertEquals(0, region.getMemStoreDataSize());
+      //All the chunk in the MSLAB should be recycled, otherwise, there might be
+      //a memory leak.
+      Assert.assertEquals(0, ChunkCreator.getInstance().numberOfMappedChunks());
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(this.region);
+      this.region = null;
+      wals.close();
+    }
+  }
+}