You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by al...@apache.org on 2018/08/22 14:09:35 UTC
hbase git commit: HBASE-21031 Memory leak if replay edits failed
during region opening
Repository: hbase
Updated Branches:
refs/heads/branch-2.0 dcf8a2318 -> e72848a56
HBASE-21031 Memory leak if replay edits failed during region opening
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e72848a5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e72848a5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e72848a5
Branch: refs/heads/branch-2.0
Commit: e72848a56afb9b563194cda18c21f8f158b5e899
Parents: dcf8a23
Author: Allan Yang <al...@apache.org>
Authored: Wed Aug 22 22:08:33 2018 +0800
Committer: Allan Yang <al...@apache.org>
Committed: Wed Aug 22 22:08:33 2018 +0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 49 ++++-
.../hadoop/hbase/regionserver/HStore.java | 4 +
.../regionserver/RegionServerAccounting.java | 52 -----
.../regionserver/handler/OpenRegionHandler.java | 12 +-
.../TestRecoveredEditsReplayAndAbort.java | 214 +++++++++++++++++++
5 files changed, 259 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e72848a5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ba75a2e..d640b54 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -857,7 +857,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return What the next sequence (edit) id should be.
* @throws IOException e
*/
- private long initialize(final CancelableProgressable reporter) throws IOException {
+ @VisibleForTesting
+ long initialize(final CancelableProgressable reporter) throws IOException {
//Refuse to open the region if there is no column family in the table
if (htableDescriptor.getColumnFamilyCount() == 0) {
@@ -870,6 +871,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
nextSeqId = initializeRegionInternals(reporter, status);
return nextSeqId;
+ } catch (IOException e) {
+ LOG.warn("Failed initialize of region= {}, starting to roll back memstore",
+ getRegionInfo().getRegionNameAsString(), e);
+ // global memstore size will be decreased when dropping memstore
+ try {
+ //drop the memory used by memstore if open region fails
+ dropMemStoreContents();
+ } catch (IOException ioE) {
+ if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
+ LOG.warn("Failed drop memstore of region= {}, "
+ + "some chunks may not released forever since MSLAB is enabled",
+ getRegionInfo().getRegionNameAsString());
+ }
+
+ }
+ throw e;
} finally {
// nextSeqid will be -1 if the initialization fails.
// At least it will be 0 otherwise.
@@ -4425,11 +4442,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
}
- // The edits size added into rsAccounting during this replaying will not
- // be required any more. So just clear it.
- if (this.rsAccounting != null) {
- this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName());
- }
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
@@ -4611,9 +4623,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
editsCount++;
}
MemStoreSize mss = memStoreSizing.getMemStoreSize();
- if (this.rsAccounting != null) {
- rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), mss);
- }
incMemStoreSize(mss);
flush = isFlushSize(this.memStoreSizing.getMemStoreSize());
if (flush) {
@@ -5056,6 +5065,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
+ * Be careful, this method will drop all data in the memstore of this region.
+ * Currently, this method is used to drop memstore to prevent memory leak
+ * when replaying recovered.edits while opening region.
+ */
+ public MemStoreSize dropMemStoreContents() throws IOException {
+ MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing();
+ this.updatesLock.writeLock().lock();
+ try {
+ for (HStore s : stores.values()) {
+ MemStoreSize memStoreSize = doDropStoreMemStoreContentsForSeqId(s, HConstants.NO_SEQNUM);
+ LOG.info("Drop memstore for Store " + s.getColumnFamilyName() + " in region "
+ + this.getRegionInfo().getRegionNameAsString()
+ + " , dropped memstoresize: [" + memStoreSize + " }");
+ totalFreedSize.incMemStoreSize(memStoreSize);
+ }
+ return totalFreedSize.getMemStoreSize();
+ } finally {
+ this.updatesLock.writeLock().unlock();
+ }
+ }
+
+ /**
* Drops the memstore contents after replaying a flush descriptor or region open event replay
* if the memstore edits have seqNums smaller than the given seq id
* @throws IOException
http://git-wip-us.apache.org/repos/asf/hbase/blob/e72848a5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 3943de1..7adeb85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2321,6 +2321,10 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
@Override
public void abort() throws IOException {
if (snapshot != null) {
+ //We need to close the snapshot when aborting, otherwise, the segment scanner
+ //won't be closed. If we are using MSLAB, the chunk referenced by those scanners
+ //can't be released, thus memory leak
+ snapshot.close();
HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e72848a5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
index 4e66fc7..baa9a6a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
@@ -19,14 +19,11 @@
package org.apache.hadoop.hbase.regionserver;
import java.lang.management.MemoryType;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
/**
@@ -43,11 +40,6 @@ public class RegionServerAccounting {
// memstore off-heap size.
private final LongAdder globalMemStoreOffHeapSize = new LongAdder();
- // Store the edits size during replaying WAL. Use this to roll back the
- // global memstore size once a region opening failed.
- private final ConcurrentMap<byte[], MemStoreSizing> replayEditsPerRegion =
- new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
-
private long globalMemStoreLimit;
private final float globalMemStoreLimitLowMarkPercent;
private long globalMemStoreLimitLowMark;
@@ -216,48 +208,4 @@ public class RegionServerAccounting {
getGlobalMemStoreHeapSize() * 1.0 / globalOnHeapMemstoreLimitLowMark);
}
}
-
- /***
- * Add memStoreSize to replayEditsPerRegion.
- *
- * @param regionName region name.
- * @param memStoreSize the Memstore size will be added to replayEditsPerRegion.
- */
- public void addRegionReplayEditsSize(byte[] regionName, MemStoreSize memStoreSize) {
- MemStoreSizing replayEdistsSize = replayEditsPerRegion.get(regionName);
- // All ops on the same MemStoreSize object is going to be done by single thread, sequentially
- // only. First calls to this method to increment the per region reply edits size and then call
- // to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of
- // the region open operation. No need to handle multi thread issues on one region's entry in
- // this Map.
- if (replayEdistsSize == null) {
- replayEdistsSize = new ThreadSafeMemStoreSizing();
- replayEditsPerRegion.put(regionName, replayEdistsSize);
- }
- replayEdistsSize.incMemStoreSize(memStoreSize);
- }
-
- /**
- * Roll back the global MemStore size for a specified region when this region
- * can't be opened.
- *
- * @param regionName the region which could not open.
- */
- public void rollbackRegionReplayEditsSize(byte[] regionName) {
- MemStoreSizing replayEditsSizing = replayEditsPerRegion.get(regionName);
- if (replayEditsSizing != null) {
- clearRegionReplayEditsSize(regionName);
- decGlobalMemStoreSize(replayEditsSizing.getDataSize(), replayEditsSizing.getHeapSize(),
- replayEditsSizing.getOffHeapSize());
- }
- }
-
- /**
- * Clear a region from replayEditsPerRegion.
- *
- * @param regionName region name.
- */
- public void clearRegionReplayEditsSize(byte[] regionName) {
- replayEditsPerRegion.remove(regionName);
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e72848a5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
index f408629..970911f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
@@ -300,16 +299,7 @@ public class OpenRegionHandler extends EventHandler {
// and transition the node back to FAILED_OPEN. If that fails,
// we rely on the Timeout Monitor in the master to reassign.
LOG.error(
- "Failed open of region=" + this.regionInfo.getRegionNameAsString()
- + ", starting to roll back the global memstore size.", t);
- // Decrease the global memstore size.
- if (this.rsServices != null) {
- RegionServerAccounting rsAccounting =
- this.rsServices.getRegionServerAccounting();
- if (rsAccounting != null) {
- rsAccounting.rollbackRegionReplayEditsSize(this.regionInfo.getRegionName());
- }
- }
+ "Failed open of region=" + this.regionInfo.getRegionNameAsString(), t);
}
return region;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e72848a5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
new file mode 100644
index 0000000..b0cbd58
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
@@ -0,0 +1,214 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HBASE-21031
+ * If replay edits fails, we need to make sure memstore is rollbacked
+ * And if MSLAB is used, all chunk is released too.
+ */
+@Category({RegionServerTests.class, SmallTests.class })
+public class TestRecoveredEditsReplayAndAbort {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRecoveredEditsReplayAndAbort.class);
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestRecoveredEditsReplayAndAbort.class);
+
+ protected final byte[] row = Bytes.toBytes("rowA");
+
+ protected final static byte [] fam1 = Bytes.toBytes("colfamily11");
+
+ @Rule
+ public TestName name = new TestName();
+
+ // Test names
+ protected TableName tableName;
+ protected String method;
+
+ protected static HBaseTestingUtility TEST_UTIL;
+ public static Configuration CONF ;
+ private static FileSystem FILESYSTEM;
+ private HRegion region = null;
+
+ private final Random random = new Random();
+
+ @Before
+ public void setup() throws IOException {
+ TEST_UTIL = new HBaseTestingUtility();
+ FILESYSTEM = TEST_UTIL.getTestFileSystem();
+ CONF = TEST_UTIL.getConfiguration();
+ method = name.getMethodName();
+ tableName = TableName.valueOf(method);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
+ TEST_UTIL.cleanupTestDir();
+ }
+
+ @Test
+ public void test() throws Exception {
+ //set flush size to 10MB
+ CONF.setInt("hbase.hregion.memstore.flush.size", 1024 * 1024 * 10);
+ //set the report interval to a very small value
+ CONF.setInt("hbase.hstore.report.interval.edits", 1);
+ CONF.setInt("hbase.hstore.report.period", 0);
+ //mock a RegionServerServices
+ final RegionServerAccounting rsAccounting = new RegionServerAccounting(CONF);
+ RegionServerServices rs = Mockito.mock(RegionServerServices.class);
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
+ Mockito.when(rs.getRegionServerAccounting()).thenReturn(rsAccounting);
+ Mockito.when(rs.isAborted()).thenReturn(false);
+ Mockito.when(rs.getNonceManager()).thenReturn(null);
+ Mockito.when(rs.getServerName()).thenReturn(ServerName
+ .valueOf("test", 0, 111));
+ //create a region
+ TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort");
+ TableDescriptor htd = TableDescriptorBuilder.newBuilder(testTable)
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam1).build())
+ .build();
+ HRegionInfo info = new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, false);
+ Path logDir = TEST_UTIL
+ .getDataTestDirOnTestFS("TestRecoveredEidtsReplayAndAbort.log");
+ final WAL wal = HBaseTestingUtility.createWal(CONF, logDir, info);
+ Path rootDir = TEST_UTIL.getDataTestDir();
+ Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
+ HRegionFileSystem
+ .createRegionOnFileSystem(CONF, TEST_UTIL.getTestFileSystem(), tableDir, info);
+ region = HRegion.newHRegion(tableDir, wal, TEST_UTIL.getTestFileSystem(), CONF, info,
+ htd, rs);
+ //create some recovered.edits
+ final WALFactory wals = new WALFactory(CONF, method);
+ try {
+ Path regiondir = region.getRegionFileSystem().getRegionDir();
+ FileSystem fs = region.getRegionFileSystem().getFileSystem();
+ byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
+
+ Path recoveredEditsDir = WALSplitter
+ .getRegionDirRecoveredEditsDir(regiondir);
+ long maxSeqId = 1200;
+ long minSeqId = 1000;
+ long totalEdits = maxSeqId - minSeqId;
+ for (long i = minSeqId; i <= maxSeqId; i += 100) {
+ Path recoveredEdits = new Path(recoveredEditsDir,
+ String.format("%019d", i));
+ LOG.info("Begin to write recovered.edits : " + recoveredEdits);
+ fs.create(recoveredEdits);
+ WALProvider.Writer writer = wals
+ .createRecoveredEditsWriter(fs, recoveredEdits);
+ for (long j = i; j < i + 100; j++) {
+ long time = System.nanoTime();
+ WALEdit edit = new WALEdit();
+ // 200KB kv
+ byte[] value = new byte[200 * 1024];
+ random.nextBytes(value);
+ edit.add(
+ new KeyValue(row, fam1, Bytes.toBytes(j), time, KeyValue.Type.Put,
+ value));
+ writer.append(new WAL.Entry(
+ new WALKeyImpl(regionName, tableName, j, time,
+ HConstants.DEFAULT_CLUSTER_ID), edit));
+ }
+ writer.close();
+ }
+ MonitoredTask status = TaskMonitor.get().createStatus(method);
+ //try to replay the edits
+ try {
+ region.initialize(new CancelableProgressable() {
+ private long replayedEdits = 0;
+
+ @Override
+ public boolean progress() {
+ replayedEdits++;
+ //during replay, rsAccounting should align with global memstore, because
+ //there is only one memstore here
+ Assert.assertEquals(rsAccounting.getGlobalMemStoreDataSize(),
+ region.getMemStoreDataSize());
+ Assert.assertEquals(rsAccounting.getGlobalMemStoreHeapSize(),
+ region.getMemStoreHeapSize());
+ Assert.assertEquals(rsAccounting.getGlobalMemStoreOffHeapSize(),
+ region.getMemStoreOffHeapSize());
+ // abort the replay before finishing, leaving some edits in the memory
+ return replayedEdits < totalEdits - 10;
+ }
+ });
+ Assert.fail("Should not reach here");
+ } catch (IOException t) {
+ LOG.info("Current memstore: " + region.getMemStoreDataSize() + ", " + region
+ .getMemStoreHeapSize() + ", " + region
+ .getMemStoreOffHeapSize());
+ }
+ //After aborting replay, there should be no data in the memory
+ Assert.assertEquals(0, rsAccounting.getGlobalMemStoreDataSize());
+ Assert.assertEquals(0, region.getMemStoreDataSize());
+ //All the chunk in the MSLAB should be recycled, otherwise, there might be
+ //a memory leak.
+ Assert.assertEquals(0, ChunkCreator.getInstance().numberOfMappedChunks());
+ } finally {
+ HBaseTestingUtility.closeRegionAndWAL(this.region);
+ this.region = null;
+ wals.close();
+ }
+ }
+}