You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/12 13:11:04 UTC
[08/34] hbase git commit: HBASE-20411 Ameliorate MutableSegment
synchronize
HBASE-20411 Ameliorate MutableSegment synchronize
Change the MemStore size accounting so we don't synchronize across three
volatiles applying deltas. Instead:
+ Make MemStoreSize, a datastructure of our memstore size longs, immutable.
+ Undo MemStoreSizing being an instance of MemStoreSize; instead it has-a.
+ Make two MemStoreSizing implementations; one thread-safe, the other not.
+ Let all memory sizing longs run independent, untied by
synchronize (Huaxiang and Anoop suggestion) using atomiclongs.
+ Review all use of MemStoreSizing. Many are single-threaded and do
not need to be synchronized; use the non-thread safe counter.
TODO: Use this technique accounting at the global level too.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/021f66d1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/021f66d1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/021f66d1
Branch: refs/heads/HBASE-19064
Commit: 021f66d11d2cbb7308308093e29e69d6e7661ee9
Parents: c60578d
Author: Michael Stack <st...@apache.org>
Authored: Sun May 6 21:29:49 2018 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Sat May 12 02:17:50 2018 +0100
----------------------------------------------------------------------
.../hbase/regionserver/AbstractMemStore.java | 8 +-
.../regionserver/CSLMImmutableSegment.java | 2 +-
.../regionserver/CellArrayImmutableSegment.java | 6 +-
.../regionserver/CellChunkImmutableSegment.java | 14 +-
.../hbase/regionserver/CompactingMemStore.java | 38 ++---
.../hbase/regionserver/CompactionPipeline.java | 44 ++---
.../regionserver/CompositeImmutableSegment.java | 10 +-
.../hbase/regionserver/DefaultMemStore.java | 39 ++---
.../hadoop/hbase/regionserver/HRegion.java | 162 ++++++++++---------
.../hadoop/hbase/regionserver/HStore.java | 2 +-
.../hadoop/hbase/regionserver/MemStore.java | 3 +-
.../hbase/regionserver/MemStoreFlusher.java | 21 ++-
.../hadoop/hbase/regionserver/MemStoreSize.java | 76 ++++-----
.../hbase/regionserver/MemStoreSizing.java | 113 ++++++++-----
.../hbase/regionserver/MutableSegment.java | 4 +-
.../NonThreadSafeMemStoreSizing.java | 81 ++++++++++
.../regionserver/RegionServerAccounting.java | 33 ++--
.../regionserver/RegionServicesForStores.java | 4 +-
.../hbase/regionserver/ScannerContext.java | 3 -
.../hadoop/hbase/regionserver/Segment.java | 64 ++++----
.../regionserver/ThreadSafeMemStoreSizing.java | 80 +++++++++
.../regionserver/TestCompactingMemStore.java | 44 ++---
.../TestCompactingToCellFlatMapMemStore.java | 43 ++---
.../hbase/regionserver/TestDefaultMemStore.java | 13 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 12 +-
.../regionserver/TestHRegionReplayEvents.java | 28 ++--
.../TestHRegionWithInMemoryFlush.java | 2 +-
.../hadoop/hbase/regionserver/TestHStore.java | 41 +++--
28 files changed, 595 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 9f4fd2f..b82afba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -165,13 +165,7 @@ public abstract class AbstractMemStore implements MemStore {
@Override
public MemStoreSize getSnapshotSize() {
- return getSnapshotSizing();
- }
-
- MemStoreSizing getSnapshotSizing() {
- return new MemStoreSizing(this.snapshot.keySize(),
- this.snapshot.heapSize(),
- this.snapshot.offHeapSize());
+ return this.snapshot.getMemStoreSize();
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
index 6af84cb..9e206ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
@@ -40,7 +40,7 @@ public class CSLMImmutableSegment extends ImmutableSegment {
super(segment);
// update the segment metadata heap size
long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM;
- incSize(0, indexOverhead, 0); // CSLM is always on-heap
+ incMemStoreSize(0, indexOverhead, 0); // CSLM is always on-heap
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
index 4631200..dadfc48 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
@@ -45,7 +45,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
- incSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap
+ incMemStoreSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
initializeCellSet(numOfCells, iterator, action);
}
@@ -59,7 +59,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
MemStoreCompactionStrategy.Action action) {
super(segment); // initiailize the upper class
long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
- incSize(0, indexOverhead, 0); // CAM is always on-heap
+ incMemStoreSize(0, indexOverhead, 0); // CAM is always on-heap
int numOfCells = segment.getCellsCount();
// build the new CellSet based on CellChunkMap and update the CellSet of this Segment
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
@@ -67,7 +67,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
// arrange the meta-data size, decrease all meta-data sizes related to SkipList;
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
- incSize(0, newSegmentSizeDelta, 0);
+ incMemStoreSize(0, newSegmentSizeDelta, 0);
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
index 0eebfb5..e2f8205 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
@@ -58,9 +58,9 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
boolean onHeap = getMemStoreLAB().isOnHeap();
// initiate the heapSize with the size of the segment metadata
if(onHeap) {
- incSize(0, indexOverhead, 0);
+ incMemStoreSize(0, indexOverhead, 0);
} else {
- incSize(0, 0, indexOverhead);
+ incMemStoreSize(0, 0, indexOverhead);
}
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
initializeCellSet(numOfCells, iterator, action);
@@ -79,9 +79,9 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
boolean onHeap = getMemStoreLAB().isOnHeap();
// initiate the heapSize with the size of the segment metadata
if(onHeap) {
- incSize(0, indexOverhead, 0);
+ incMemStoreSize(0, indexOverhead, 0);
} else {
- incSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM);
+ incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM);
}
int numOfCells = segment.getCellsCount();
// build the new CellSet based on CellChunkMap
@@ -92,10 +92,10 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// (reinitializeCellSet doesn't take the care for the sizes)
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
if(onHeap) {
- incSize(0, newSegmentSizeDelta, 0);
+ incMemStoreSize(0, newSegmentSizeDelta, 0);
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
} else {
- incSize(0, 0, newSegmentSizeDelta);
+ incMemStoreSize(0, 0, newSegmentSizeDelta);
memstoreSizing.incMemStoreSize(0, 0, newSegmentSizeDelta);
}
@@ -333,7 +333,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
long heapOverhead = newHeapSize - oldHeapSize;
long offHeapOverhead = newOffHeapSize - oldOffHeapSize;
//TODO: maybe need to update the dataSize of the region
- incSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
+ incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
return cell;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index c1b061a..fc3e5ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -155,12 +155,12 @@ public class CompactingMemStore extends AbstractMemStore {
*/
@Override
public MemStoreSize size() {
- MemStoreSizing memstoreSizing = new MemStoreSizing();
+ MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
memstoreSizing.incMemStoreSize(active.getMemStoreSize());
for (Segment item : pipeline.getSegments()) {
memstoreSizing.incMemStoreSize(item.getMemStoreSize());
}
- return memstoreSizing;
+ return memstoreSizing.getMemStoreSize();
}
/**
@@ -216,42 +216,38 @@ public class CompactingMemStore extends AbstractMemStore {
return new MemStoreSnapshot(snapshotId, this.snapshot);
}
- /**
- * On flush, how much memory we will clear.
- * @return size of data that is going to be flushed
- */
@Override
public MemStoreSize getFlushableSize() {
- MemStoreSizing snapshotSizing = getSnapshotSizing();
- if (snapshotSizing.getDataSize() == 0) {
+ MemStoreSize mss = getSnapshotSize();
+ if (mss.getDataSize() == 0) {
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
if (compositeSnapshot) {
- snapshotSizing = pipeline.getPipelineSizing();
- snapshotSizing.incMemStoreSize(active.getMemStoreSize());
+ MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
+ memStoreSizing.incMemStoreSize(this.active.getMemStoreSize());
+ mss = memStoreSizing.getMemStoreSize();
} else {
- snapshotSizing = pipeline.getTailSizing();
+ mss = pipeline.getTailSize();
}
}
- return snapshotSizing.getDataSize() > 0 ? snapshotSizing
- : new MemStoreSize(active.getMemStoreSize());
+ return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
}
@Override
protected long keySize() {
- // Need to consider keySize of all segments in pipeline and active
- long k = this.active.keySize();
+ // Need to consider dataSize/keySize of all segments in pipeline and active
+ long keySize = this.active.getDataSize();
for (Segment segment : this.pipeline.getSegments()) {
- k += segment.keySize();
+ keySize += segment.getDataSize();
}
- return k;
+ return keySize;
}
@Override
protected long heapSize() {
// Need to consider heapOverhead of all segments in pipeline and active
- long h = this.active.heapSize();
+ long h = this.active.getHeapSize();
for (Segment segment : this.pipeline.getSegments()) {
- h += segment.heapSize();
+ h += segment.getHeapSize();
}
return h;
}
@@ -447,7 +443,7 @@ public class CompactingMemStore extends AbstractMemStore {
@VisibleForTesting
protected boolean shouldFlushInMemory() {
- if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold
+ if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush threshold
if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush
return false; // regardless the size
}
@@ -571,7 +567,7 @@ public class CompactingMemStore extends AbstractMemStore {
// debug method
public void debug() {
- String msg = "active size=" + this.active.keySize();
+ String msg = "active size=" + this.active.getDataSize();
msg += " in-memory flush size is "+ inmemoryFlushSize;
msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 907df72..f8aa3ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -23,11 +23,12 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
/**
* The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
@@ -135,19 +136,21 @@ public class CompactionPipeline {
// update the global memstore size counter
long suffixDataSize = getSegmentsKeySize(suffix);
long newDataSize = 0;
- if(segment != null) newDataSize = segment.keySize();
+ if(segment != null) {
+ newDataSize = segment.getDataSize();
+ }
long dataSizeDelta = suffixDataSize - newDataSize;
long suffixHeapSize = getSegmentsHeapSize(suffix);
long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
long newHeapSize = 0;
long newOffHeapSize = 0;
if(segment != null) {
- newHeapSize = segment.heapSize();
- newOffHeapSize = segment.offHeapSize();
+ newHeapSize = segment.getHeapSize();
+ newOffHeapSize = segment.getOffHeapSize();
}
long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
long heapSizeDelta = suffixHeapSize - newHeapSize;
- region.addMemStoreSize(new MemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta));
+ region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta);
LOG.debug("Suffix data size={}, new segment data size={}, "
+ "suffix heap size={}," + "new segment heap size={}"
+ "suffix off heap size={}," + "new segment off heap size={}"
@@ -164,7 +167,7 @@ public class CompactionPipeline {
private static long getSegmentsHeapSize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
- res += segment.heapSize();
+ res += segment.getHeapSize();
}
return res;
}
@@ -172,7 +175,7 @@ public class CompactionPipeline {
private static long getSegmentsOffHeapSize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
- res += segment.offHeapSize();
+ res += segment.getOffHeapSize();
}
return res;
}
@@ -180,7 +183,7 @@ public class CompactionPipeline {
private static long getSegmentsKeySize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
- res += segment.keySize();
+ res += segment.getDataSize();
}
return res;
}
@@ -211,15 +214,17 @@ public class CompactionPipeline {
int i = 0;
for (ImmutableSegment s : pipeline) {
if ( s.canBeFlattened() ) {
- MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated
+ // size to be updated
+ MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action);
replaceAtIndex(i,newS);
if(region != null) {
- // update the global memstore size counter
- // upon flattening there is no change in the data size
- region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize(),
- newMemstoreAccounting.getOffHeapSize()));
+ // Update the global memstore size counter upon flattening there is no change in the
+ // data size
+ MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
+ Preconditions.checkArgument(mss.getDataSize() == 0, "Not zero!");
+ region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
}
LOG.debug("Compaction pipeline segment {} flattened", s);
return true;
@@ -254,19 +259,18 @@ public class CompactionPipeline {
return minSequenceId;
}
- public MemStoreSizing getTailSizing() {
+ public MemStoreSize getTailSize() {
LinkedList<? extends Segment> localCopy = readOnlyCopy;
- if (localCopy.isEmpty()) return new MemStoreSizing();
- return new MemStoreSizing(localCopy.peekLast().getMemStoreSize());
+ return localCopy.isEmpty()? new MemStoreSize(): localCopy.peekLast().getMemStoreSize();
}
- public MemStoreSizing getPipelineSizing() {
- MemStoreSizing memStoreSizing = new MemStoreSizing();
+ public MemStoreSize getPipelineSize() {
+ MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
LinkedList<? extends Segment> localCopy = readOnlyCopy;
for (Segment segment : localCopy) {
memStoreSizing.incMemStoreSize(segment.getMemStoreSize());
}
- return memStoreSizing;
+ return memStoreSizing.getMemStoreSize();
}
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
index 98e0981..1fd2f23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
@@ -48,7 +48,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
for (ImmutableSegment s : segments) {
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());
- this.keySize += s.keySize();
+ this.keySize += s.getDataSize();
}
}
@@ -170,7 +170,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
* @return Sum of all cell sizes.
*/
@Override
- public long keySize() {
+ public long getDataSize() {
return this.keySize;
}
@@ -178,10 +178,10 @@ public class CompositeImmutableSegment extends ImmutableSegment {
* @return The heap size of this segment.
*/
@Override
- public long heapSize() {
+ public long getHeapSize() {
long result = 0;
for (ImmutableSegment s : segments) {
- result += s.heapSize();
+ result += s.getHeapSize();
}
return result;
}
@@ -190,7 +190,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
* Updates the heap size counter of the segment by the given delta
*/
@Override
- protected void incSize(long delta, long heapOverhead, long offHeapOverhead) {
+ public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index ddeaddf..5dcf48b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -97,26 +97,20 @@ public class DefaultMemStore extends AbstractMemStore {
return new MemStoreSnapshot(this.snapshotId, this.snapshot);
}
- /**
- * On flush, how much memory we will clear from the active cell set.
- *
- * @return size of data that is going to be flushed from active set
- */
@Override
public MemStoreSize getFlushableSize() {
- MemStoreSize snapshotSize = getSnapshotSize();
- return snapshotSize.getDataSize() > 0 ? snapshotSize
- : new MemStoreSize(active.getMemStoreSize());
+ MemStoreSize mss = getSnapshotSize();
+ return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
}
@Override
protected long keySize() {
- return this.active.keySize();
+ return this.active.getDataSize();
}
@Override
protected long heapSize() {
- return this.active.heapSize();
+ return this.active.getHeapSize();
}
@Override
@@ -154,7 +148,7 @@ public class DefaultMemStore extends AbstractMemStore {
@Override
public MemStoreSize size() {
- return new MemStoreSize(active.getMemStoreSize());
+ return active.getMemStoreSize();
}
/**
@@ -193,26 +187,27 @@ public class DefaultMemStore extends AbstractMemStore {
byte [] fam = Bytes.toBytes("col");
byte [] qf = Bytes.toBytes("umn");
byte [] empty = new byte[0];
- MemStoreSizing memstoreSizing = new MemStoreSizing();
+ MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
for (int i = 0; i < count; i++) {
// Give each its own ts
- memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing);
+ memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
}
- LOG.info("memstore1 estimated size="
- + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
+ LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
+ memStoreSizing.getMemStoreSize().getHeapSize());
for (int i = 0; i < count; i++) {
- memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing);
+ memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
}
- LOG.info("memstore1 estimated size (2nd loading of same data)="
- + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
+ LOG.info("memstore1 estimated size (2nd loading of same data)={}",
+ memStoreSizing.getMemStoreSize().getDataSize() +
+ memStoreSizing.getMemStoreSize().getHeapSize());
// Make a variably sized memstore.
DefaultMemStore memstore2 = new DefaultMemStore();
- memstoreSizing = new MemStoreSizing();
+ memStoreSizing = new NonThreadSafeMemStoreSizing();
for (int i = 0; i < count; i++) {
- memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSizing);
+ memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing);
}
- LOG.info("memstore2 estimated size="
- + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
+ LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
+ memStoreSizing.getMemStoreSize().getHeapSize());
final int seconds = 30;
LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
LOG.info("Exiting.");
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1fb6afe..4926398 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -292,7 +292,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
// Track data size in all memstores
- private final MemStoreSizing memStoreSize = new MemStoreSizing();
+ private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this);
// Debug possible data loss due to WAL off
@@ -1210,36 +1210,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Increase the size of mem store in this region and the size of global mem
* store
*/
- public void incMemStoreSize(MemStoreSize memStoreSize) {
+ void incMemStoreSize(MemStoreSize mss) {
+ incMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
+ }
+
+ void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
if (this.rsAccounting != null) {
- rsAccounting.incGlobalMemStoreSize(memStoreSize);
+ rsAccounting.incGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
}
- long dataSize;
- synchronized (this.memStoreSize) {
- this.memStoreSize.incMemStoreSize(memStoreSize);
- dataSize = this.memStoreSize.getDataSize();
- }
- checkNegativeMemStoreDataSize(dataSize, memStoreSize.getDataSize());
+ long dataSize =
+ this.memStoreSizing.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
+ checkNegativeMemStoreDataSize(dataSize, dataSizeDelta);
}
- public void decrMemStoreSize(MemStoreSize memStoreSize) {
+ void decrMemStoreSize(MemStoreSize mss) {
+ decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
+ }
+
+ void decrMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
if (this.rsAccounting != null) {
- rsAccounting.decGlobalMemStoreSize(memStoreSize);
- }
- long size;
- synchronized (this.memStoreSize) {
- this.memStoreSize.decMemStoreSize(memStoreSize);
- size = this.memStoreSize.getDataSize();
+ rsAccounting.decGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
}
- checkNegativeMemStoreDataSize(size, -memStoreSize.getDataSize());
+ long dataSize =
+ this.memStoreSizing.decMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
+ checkNegativeMemStoreDataSize(dataSize, -dataSizeDelta);
}
private void checkNegativeMemStoreDataSize(long memStoreDataSize, long delta) {
- // This is extremely bad if we make memStoreSize negative. Log as much info on the offending
- // caller as possible. (memStoreSize might be a negative value already -- freeing memory)
+ // This is extremely bad if we make memStoreSizing negative. Log as much info on the offending
+ // caller as possible. (memStoreSizing might be a negative value already -- freeing memory)
if (memStoreDataSize < 0) {
LOG.error("Asked to modify this region's (" + this.toString()
- + ") memStoreSize to a negative value which is incorrect. Current memStoreSize="
+ + ") memStoreSizing to a negative value which is incorrect. Current memStoreSizing="
+ (memStoreDataSize - delta) + ", delta=" + delta, new Exception());
}
}
@@ -1274,17 +1276,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public long getMemStoreDataSize() {
- return memStoreSize.getDataSize();
+ return memStoreSizing.getDataSize();
}
@Override
public long getMemStoreHeapSize() {
- return memStoreSize.getHeapSize();
+ return memStoreSizing.getHeapSize();
}
@Override
public long getMemStoreOffHeapSize() {
- return memStoreSize.getOffHeapSize();
+ return memStoreSizing.getOffHeapSize();
}
/** @return store services for this region, to access services required by store level needs */
@@ -1555,7 +1557,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int failedfFlushCount = 0;
int flushCount = 0;
long tmp = 0;
- long remainingSize = this.memStoreSize.getDataSize();
+ long remainingSize = this.memStoreSizing.getDataSize();
while (remainingSize > 0) {
try {
internalFlushcache(status);
@@ -1564,7 +1566,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
" (carrying snapshot?) " + this);
}
flushCount++;
- tmp = this.memStoreSize.getDataSize();
+ tmp = this.memStoreSizing.getDataSize();
if (tmp >= remainingSize) {
failedfFlushCount++;
}
@@ -1598,13 +1600,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// close each store in parallel
for (HStore store : stores.values()) {
- MemStoreSize flushableSize = store.getFlushableSize();
- if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) {
+ MemStoreSize mss = store.getFlushableSize();
+ if (!(abort || mss.getDataSize() == 0 || writestate.readOnly)) {
if (getRegionServerServices() != null) {
getRegionServerServices().abort("Assertion failed while closing store "
+ getRegionInfo().getRegionNameAsString() + " " + store
- + ". flushableSize expected=0, actual= " + flushableSize
- + ". Current memStoreSize=" + getMemStoreDataSize() + ". Maybe a coprocessor "
+ + ". flushableSize expected=0, actual={" + mss
+ + "}. Current memStoreSize=" + this.memStoreSizing.getMemStoreSize() +
+ ". Maybe a coprocessor "
+ "operation failed and left the memstore in a partially updated state.", null);
}
}
@@ -1647,9 +1650,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.closed.set(true);
if (!canFlush) {
- this.decrMemStoreSize(new MemStoreSize(memStoreSize));
- } else if (memStoreSize.getDataSize() != 0) {
- LOG.error("Memstore data size is " + memStoreSize.getDataSize());
+ decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
+ } else if (this.memStoreSizing.getDataSize() != 0) {
+ LOG.error("Memstore data size is {}", this.memStoreSizing.getDataSize());
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-close hooks");
@@ -1782,7 +1785,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return True if its worth doing a flush before we put up the close flag.
*/
private boolean worthPreFlushing() {
- return this.memStoreSize.getDataSize() >
+ return this.memStoreSizing.getDataSize() >
this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
}
@@ -2400,12 +2403,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs
// to no other that it can use to associate with the bulk load. Hence this little dance below
// to go get one.
- if (this.memStoreSize.getDataSize() <= 0) {
+ if (this.memStoreSizing.getDataSize() <= 0) {
// Take an update lock so no edits can come into memory just yet.
this.updatesLock.writeLock().lock();
WriteEntry writeEntry = null;
try {
- if (this.memStoreSize.getDataSize() <= 0) {
+ if (this.memStoreSizing.getDataSize() <= 0) {
// Presume that if there are still no edits in the memstore, then there are no edits for
// this region out in the WAL subsystem so no need to do any trickery clearing out
// edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
@@ -2447,7 +2450,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// block waiting for the lock for internal flush
this.updatesLock.writeLock().lock();
status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
- MemStoreSizing totalSizeOfFlushableStores = new MemStoreSizing();
+ MemStoreSizing totalSizeOfFlushableStores = new NonThreadSafeMemStoreSizing();
Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
for (HStore store : storesToFlush) {
@@ -2536,14 +2539,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!isAllFamilies(storesToFlush)) {
perCfExtras = new StringBuilder();
for (HStore store: storesToFlush) {
+ MemStoreSize mss = store.getFlushableSize();
perCfExtras.append("; ").append(store.getColumnFamilyName());
- perCfExtras.append("=")
- .append(StringUtils.byteDesc(store.getFlushableSize().getDataSize()));
+ perCfExtras.append("={dataSize=")
+ .append(StringUtils.byteDesc(mss.getDataSize()));
+ perCfExtras.append(", heapSize=")
+ .append(StringUtils.byteDesc(mss.getHeapSize()));
+ perCfExtras.append(", offHeapSize=")
+ .append(StringUtils.byteDesc(mss.getOffHeapSize()));
+ perCfExtras.append("}");
}
}
+ MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + " column families," +
- " memstore data size=" + StringUtils.byteDesc(this.memStoreSize.getDataSize()) +
- " memstore heap size=" + StringUtils.byteDesc(this.memStoreSize.getHeapSize()) +
+ " memstore data size=" + StringUtils.byteDesc(mss.getDataSize()) +
+ " memstore heap size=" + StringUtils.byteDesc(mss.getHeapSize()) +
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId));
}
@@ -2663,7 +2673,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
storeFlushCtxs.clear();
// Set down the memstore size by amount of flush.
- this.decrMemStoreSize(prepareResult.totalFlushableSize);
+ MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize();
+ this.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
// Increase the size of this Region for the purposes of quota. Noop if quotas are disabled.
// During startup, quota manager may not be initialized yet.
@@ -2740,12 +2751,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
long time = EnvironmentEdgeManager.currentTime() - startTime;
- long flushableDataSize = prepareResult.totalFlushableSize.getDataSize();
- long flushableHeapSize = prepareResult.totalFlushableSize.getHeapSize();
- long memstoresize = this.memStoreSize.getDataSize();
+ MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize();
+ long memstoresize = this.memStoreSizing.getMemStoreSize().getDataSize();
String msg = "Finished memstore flush;"
- + " data size ~" + StringUtils.byteDesc(flushableDataSize) + "/" + flushableDataSize
- + ", heap size ~" + StringUtils.byteDesc(flushableHeapSize) + "/" + flushableHeapSize
+ + " data size ~" + StringUtils.byteDesc(mss.getDataSize()) + "/" + mss.getDataSize()
+ + ", heap size ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize()
+ ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
+ " for " + this.getRegionInfo().getEncodedName() + " in " + time + "ms, sequenceid="
+ flushOpSeqId + ", compaction requested=" + compactionRequested
@@ -2755,7 +2765,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (rsServices != null && rsServices.getMetrics() != null) {
rsServices.getMetrics().updateFlush(time - startTime,
- prepareResult.totalFlushableSize.getDataSize(), flushedOutputFileSize);
+ mss.getDataSize(), flushedOutputFileSize);
}
return new FlushResultImpl(compactionRequested ?
@@ -3067,7 +3077,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected void writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)
throws IOException {
- MemStoreSizing memStoreAccounting = new MemStoreSizing();
+ MemStoreSizing memStoreAccounting = new NonThreadSafeMemStoreSizing();
visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
// We need to update the sequence id for following reasons.
// 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
@@ -3080,7 +3090,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return true;
});
// update memStore size
- region.incMemStoreSize(memStoreAccounting);
+ region.incMemStoreSize(memStoreAccounting.getDataSize(), memStoreAccounting.getHeapSize(),
+ memStoreAccounting.getOffHeapSize());
}
public boolean isDone() {
@@ -4274,8 +4285,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().isMetaRegion()) return;
- if (this.memStoreSize.getHeapSize()
- + this.memStoreSize.getOffHeapSize() > this.blockingMemStoreSize) {
+ MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
+ if (mss.getHeapSize() + mss.getOffHeapSize() > this.blockingMemStoreSize) {
blockedRequestsCount.increment();
requestFlush();
// Don't print current limit because it will vary too much. The message is used as a key
@@ -4645,7 +4656,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
boolean flush = false;
- MemStoreSizing memstoreSize = new MemStoreSizing();
+ MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
for (Cell cell: val.getCells()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
@@ -4688,15 +4699,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
PrivateCellUtil.setSequenceId(cell, currentReplaySeqId);
- restoreEdit(store, cell, memstoreSize);
+ restoreEdit(store, cell, memStoreSizing);
editsCount++;
}
+ MemStoreSize mss = memStoreSizing.getMemStoreSize();
if (this.rsAccounting != null) {
- rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(),
- memstoreSize);
+ rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), mss);
}
- incMemStoreSize(memstoreSize);
- flush = isFlushSize(this.memStoreSize);
+ incMemStoreSize(mss);
+ flush = isFlushSize(this.memStoreSizing.getMemStoreSize());
if (flush) {
internalFlushcache(null, currentEditSeqId, stores.values(), status, false,
FlushLifeCycleTracker.DUMMY);
@@ -5006,8 +5017,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
replayFlushInStores(flush, prepareFlushResult, true);
// Set down the memstore size by amount of flush.
- this.decrMemStoreSize(prepareFlushResult.totalFlushableSize);
-
+ this.decrMemStoreSize(prepareFlushResult.totalFlushableSize.getMemStoreSize());
this.prepareFlushResult = null;
writestate.flushing = false;
} else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
@@ -5039,7 +5049,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
replayFlushInStores(flush, prepareFlushResult, true);
// Set down the memstore size by amount of flush.
- this.decrMemStoreSize(prepareFlushResult.totalFlushableSize);
+ this.decrMemStoreSize(prepareFlushResult.totalFlushableSize.getMemStoreSize());
// Inspect the memstore contents to see whether the memstore contains only edits
// with seqId smaller than the flush seqId. If so, we can discard those edits.
@@ -5143,7 +5153,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException
*/
private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException {
- MemStoreSizing totalFreedSize = new MemStoreSizing();
+ MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing();
this.updatesLock.writeLock().lock();
try {
@@ -5170,7 +5180,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally {
this.updatesLock.writeLock().unlock();
}
- return totalFreedSize;
+ return totalFreedSize.getMemStoreSize();
}
private MemStoreSize doDropStoreMemStoreContentsForSeqId(HStore s, long currentSeqId)
@@ -5293,9 +5303,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
null : this.prepareFlushResult.storeFlushCtxs.get(family);
if (ctx != null) {
- MemStoreSize snapshotSize = store.getFlushableSize();
+ MemStoreSize mss = store.getFlushableSize();
ctx.abort();
- this.decrMemStoreSize(snapshotSize);
+ this.decrMemStoreSize(mss);
this.prepareFlushResult.storeFlushCtxs.remove(family);
}
}
@@ -5487,12 +5497,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
null : this.prepareFlushResult.storeFlushCtxs.get(
store.getColumnFamilyDescriptor().getName());
if (ctx != null) {
- MemStoreSize snapshotSize = store.getFlushableSize();
+ MemStoreSize mss = store.getFlushableSize();
ctx.abort();
- this.decrMemStoreSize(snapshotSize);
- this.prepareFlushResult.storeFlushCtxs.remove(
- store.getColumnFamilyDescriptor().getName());
- totalFreedDataSize += snapshotSize.getDataSize();
+ this.decrMemStoreSize(mss);
+ this.prepareFlushResult.storeFlushCtxs.
+ remove(store.getColumnFamilyDescriptor().getName());
+ totalFreedDataSize += mss.getDataSize();
}
}
}
@@ -7374,8 +7384,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return null;
}
ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
- stats.setMemStoreLoad((int) (Math.min(100, (this.memStoreSize.getHeapSize() * 100) / this
- .memstoreFlushSize)));
+ stats.setMemStoreLoad((int) (Math.min(100,
+ (this.memStoreSizing.getMemStoreSize().getHeapSize() * 100) / this.memstoreFlushSize)));
if (rsServices.getHeapMemoryManager() != null) {
// the HeapMemoryManager uses -0.0 to signal a problem asking the JVM,
// so we could just do the calculation below and we'll get a 0.
@@ -7436,7 +7446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// This is assigned by mvcc either explicity in the below or in the guts of the WAL append
// when it assigns the edit a sequencedid (A.K.A the mvcc write number).
WriteEntry writeEntry = null;
- MemStoreSizing memstoreAccounting = new MemStoreSizing();
+ MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
try {
boolean success = false;
try {
@@ -7522,7 +7532,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally {
closeRegionOperation();
if (!mutations.isEmpty()) {
- this.incMemStoreSize(memstoreAccounting);
+ this.incMemStoreSize(memstoreAccounting.getMemStoreSize());
requestFlushIfNeeded();
}
}
@@ -7626,7 +7636,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(op);
List<Cell> results = returnResults? new ArrayList<>(mutation.size()): null;
RowLock rowLock = null;
- MemStoreSizing memstoreAccounting = new MemStoreSizing();
+ MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
try {
rowLock = getRowLockInternal(mutation.getRow(), false, null);
lock(this.updatesLock.readLock());
@@ -7676,7 +7686,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rowLock.release();
}
// Request a cache flush if over the limit. Do it outside update lock.
- incMemStoreSize(memstoreAccounting);
+ incMemStoreSize(memstoreAccounting.getMemStoreSize());
requestFlushIfNeeded();
closeRegionOperation(op);
if (this.metricsRegion != null) {
@@ -8557,7 +8567,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
private void requestFlushIfNeeded() throws RegionTooBusyException {
- if(isFlushSize(memStoreSize)) {
+ if(isFlushSize(this.memStoreSizing.getMemStoreSize())) {
requestFlush();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 9e89e5e..9494e18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2272,7 +2272,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
this.cacheFlushCount = snapshot.getCellsCount();
this.cacheFlushSize = snapshot.getDataSize();
committedFiles = new ArrayList<>(1);
- return new MemStoreSize(snapshot.getMemStoreSize());
+ return snapshot.getMemStoreSize();
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index af7e5d5..910eaed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -50,12 +50,11 @@ public interface MemStore {
void clearSnapshot(long id) throws UnexpectedStateException;
/**
- * On flush, how much memory we will clear.
* Flush will first clear out the data in snapshot if any (It will take a second flush
* invocation to clear the current Cell set). If snapshot is empty, current
* Cell set will be flushed.
*
- * @return size of data that is going to be flushed
+ * @return On flush, how much memory we will clear.
*/
MemStoreSize getFlushableSize();
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index ad8aa46..2a49797 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -555,8 +555,9 @@ class MemStoreFlusher implements FlushRequester {
// If this is first time we've been put off, then emit a log message.
if (fqe.getRequeueCount() <= 0) {
// Note: We don't impose blockingStoreFiles constraint on meta regions
- LOG.warn("Region " + region.getRegionInfo().getEncodedName() + " has too many " +
- "store files; delaying flush up to " + this.blockingWaitTime + "ms");
+ LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
+ region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
+ this.blockingWaitTime);
if (!this.server.compactSplitThread.requestSplit(region)) {
try {
this.server.compactSplitThread.requestSystemCompaction(region,
@@ -677,6 +678,14 @@ class MemStoreFlusher implements FlushRequester {
return false;
}
+ private int getStoreFileCount(Region region) {
+ int count = 0;
+ for (Store store : region.getStores()) {
+ count += store.getStorefilesCount();
+ }
+ return count;
+ }
+
/**
* Check if the regionserver's memstore memory usage is greater than the
* limit. If so, flush regions with the biggest memstores until we're down
@@ -760,10 +769,10 @@ class MemStoreFlusher implements FlushRequester {
}
}
- private void logMsg(String string1, long val, long max) {
- LOG.info("Blocking updates on " + server.toString() + ": " + string1 + " "
- + TraditionalBinaryPrefix.long2String(val, "", 1) + " is >= than blocking "
- + TraditionalBinaryPrefix.long2String(max, "", 1) + " size");
+ private void logMsg(String type, long val, long max) {
+ LOG.info("Blocking updates: {} {} is >= blocking {}", type,
+ TraditionalBinaryPrefix.long2String(val, "", 1),
+ TraditionalBinaryPrefix.long2String(max, "", 1));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java
index ec79e8d..97a416e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java
@@ -19,64 +19,56 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
-
/**
- * Reports the data size part and total heap space occupied by the MemStore.
- * Read-only.
+ * Data structure of three longs.
+ * Convenient package in which to carry current state of three counters.
+ * <p>Immutable!</p>
* @see MemStoreSizing
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public class MemStoreSize {
- // MemStore size tracks 3 sizes:
- // (1) data size: the aggregated size of all key-value not including meta data such as
- // index, time range etc.
- // (2) heap size: the aggregated size of all data that is allocated on-heap including all
- // key-values that reside on-heap and the metadata that resides on-heap
- // (3) off-heap size: the aggregated size of all data that is allocated off-heap including all
- // key-values that reside off-heap and the metadata that resides off-heap
- //
- // 3 examples to illustrate their usage:
- // Consider a store with 100MB of key-values allocated on-heap and 20MB of metadata allocated
- // on-heap. The counters are <100MB, 120MB, 0>, respectively.
- // Consider a store with 100MB of key-values allocated off-heap and 20MB of metadata
- // allocated on-heap (e.g, CAM index). The counters are <100MB, 20MB, 100MB>, respectively.
- // Consider a store with 100MB of key-values from which 95MB are allocated off-heap and 5MB
- // are allocated on-heap (e.g., due to upserts) and 20MB of metadata from which 15MB allocated
- // off-heap (e.g, CCM index) and 5MB allocated on-heap (e.g, CSLM index in active).
- // The counters are <100MB, 10MB, 110MB>, respectively.
-
/**
*'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can
- * be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap
- * or off heap LABs
+ * be in on heap or off heap area depending on the MSLAB and its configuration to be using on
+ * heap or off heap LABs
*/
- protected volatile long dataSize;
+ private final long dataSize;
- /** 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead.
+ /**'getHeapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead.
* When Cells in on heap area, this will include the cells data size as well.
*/
- protected volatile long heapSize;
+ private final long heapSize;
/** off-heap size: the aggregated size of all data that is allocated off-heap including all
* key-values that reside off-heap and the metadata that resides off-heap
*/
- protected volatile long offHeapSize;
+ private final long offHeapSize;
- public MemStoreSize() {
+ /**
+ * Package private constructor.
+ */
+ MemStoreSize() {
this(0L, 0L, 0L);
}
- public MemStoreSize(long dataSize, long heapSize, long offHeapSize) {
+ /**
+ * Package private constructor.
+ */
+ MemStoreSize(long dataSize, long heapSize, long offHeapSize) {
this.dataSize = dataSize;
this.heapSize = heapSize;
this.offHeapSize = offHeapSize;
}
- protected MemStoreSize(MemStoreSize memStoreSize) {
- this.dataSize = memStoreSize.dataSize;
- this.heapSize = memStoreSize.heapSize;
- this.offHeapSize = memStoreSize.offHeapSize;
+ /**
+ * Package private constructor.
+ */
+ MemStoreSize(MemStoreSize memStoreSize) {
+ this.dataSize = memStoreSize.getDataSize();
+ this.heapSize = memStoreSize.getHeapSize();
+ this.offHeapSize = memStoreSize.getOffHeapSize();
}
+
public boolean isEmpty() {
return this.dataSize == 0 && this.heapSize == 0 && this.offHeapSize == 0;
}
@@ -101,24 +93,22 @@ public class MemStoreSize {
if (!(obj instanceof MemStoreSize)) {
return false;
}
- MemStoreSize other = (MemStoreSize) obj;
- return this.dataSize == other.dataSize
- && this.heapSize == other.heapSize
- && this.offHeapSize == other.offHeapSize;
+ MemStoreSize other = (MemStoreSize)obj;
+ return this.dataSize == other.dataSize && this.heapSize == other.heapSize &&
+ this.offHeapSize == other.offHeapSize;
}
@Override
public int hashCode() {
- long h = 13 * this.dataSize;
- h = h + 14 * this.heapSize;
- h = h + 15 * this.offHeapSize;
+ long h = 31 * this.dataSize;
+ h = h + 31 * this.heapSize;
+ h = h + 31 * this.offHeapSize;
return (int) h;
}
@Override
public String toString() {
- return "dataSize=" + this.dataSize
- + " , heapSize=" + this.heapSize
- + " , offHeapSize=" + this.offHeapSize;
+ return "dataSize=" + this.dataSize + ", getHeapSize=" + this.heapSize +
+ ", getOffHeapSize=" + this.offHeapSize;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
index 0b3e925..8430ac6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
@@ -21,61 +21,96 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* Accounting of current heap and data sizes.
- * Allows read/write on data/heap size as opposed to {@Link MemStoreSize} which is read-only.
- * For internal use.
- * @see MemStoreSize
+ * Tracks 3 sizes:
+ * <ol>
+ * <li></li>data size: the aggregated size of all key-value not including meta data such as
+ * index, time range etc.
+ * </li>
+ * <li>heap size: the aggregated size of all data that is allocated on-heap including all
+ * key-values that reside on-heap and the metadata that resides on-heap
+ * </li>
+ * <li></li>off-heap size: the aggregated size of all data that is allocated off-heap including all
+ * key-values that reside off-heap and the metadata that resides off-heap
+ * </li>
+ * </ol>
+ *
+ * 3 examples to illustrate their usage:
+ * <p>
+ * Consider a store with 100MB of key-values allocated on-heap and 20MB of metadata allocated
+ * on-heap. The counters are <100MB, 120MB, 0>, respectively.
+ * </p>
+ * <p>Consider a store with 100MB of key-values allocated off-heap and 20MB of metadata
+ * allocated on-heap (e.g, CAM index). The counters are <100MB, 20MB, 100MB>, respectively.
+ * </p>
+ * <p>
+ * Consider a store with 100MB of key-values from which 95MB are allocated off-heap and 5MB
+ * are allocated on-heap (e.g., due to upserts) and 20MB of metadata from which 15MB allocated
+ * off-heap (e.g, CCM index) and 5MB allocated on-heap (e.g, CSLM index in active).
+ * The counters are <100MB, 10MB, 110MB>, respectively.
+ * </p>
+ *
+ * Like {@link TimeRangeTracker}, it has thread-safe and non-thread-safe implementations.
*/
@InterfaceAudience.Private
-public class MemStoreSizing extends MemStoreSize {
- public static final MemStoreSizing DUD = new MemStoreSizing() {
+public interface MemStoreSizing {
+ static final MemStoreSizing DUD = new MemStoreSizing() {
+ private final MemStoreSize mss = new MemStoreSize();
- @Override public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
- long offHeapSizeDelta) {
- throw new RuntimeException("I'm a dud, you can't use me!");
+ @Override
+ public MemStoreSize getMemStoreSize() {
+ return this.mss;
}
- @Override public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta,
- long offHeapSizeDelta) {
- throw new RuntimeException("I'm a dud, you can't use me!");
+ @Override
+ public long getDataSize() {
+ return this.mss.getDataSize();
}
- };
- public MemStoreSizing() {
- super();
- }
+ @Override
+ public long getHeapSize() {
+ return this.mss.getHeapSize();
+ }
- public MemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
- super(dataSize, heapSize, offHeapSize);
- }
+ @Override
+ public long getOffHeapSize() {
+ return this.mss.getOffHeapSize();
+ }
- public MemStoreSizing(MemStoreSize memStoreSize) {
- super(memStoreSize);
- }
+ @Override
+ public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
+ long offHeapSizeDelta) {
+ throw new RuntimeException("I'm a DUD, you can't use me!");
+ }
+ };
- public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
- this.dataSize += dataSizeDelta;
- this.heapSize += heapSizeDelta;
- this.offHeapSize += offHeapSizeDelta;
- }
+ /**
+ * @return The new dataSize ONLY as a convenience
+ */
+ long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta);
- public void incMemStoreSize(MemStoreSize delta) {
- incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
+ default long incMemStoreSize(MemStoreSize delta) {
+ return incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
}
- public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
- this.dataSize -= dataSizeDelta;
- this.heapSize -= heapSizeDelta;
- this.offHeapSize -= offHeapSizeDelta;
+ /**
+ * @return The new dataSize ONLY as a convenience
+ */
+ default long decMemStoreSize(long dataSizeDelta, long heapSizeDelta,
+ long offHeapSizeDelta) {
+ return incMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta);
}
- public void decMemStoreSize(MemStoreSize delta) {
- decMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
+ default long decMemStoreSize(MemStoreSize delta) {
+ return incMemStoreSize(-delta.getDataSize(), -delta.getHeapSize(), -delta.getOffHeapSize());
}
- public void empty() {
- this.dataSize = 0L;
- this.heapSize = 0L;
- this.offHeapSize = 0L;
- }
+ long getDataSize();
+ long getHeapSize();
+ long getOffHeapSize();
+ /**
+ * @return Use this datastructure to return all three settings, {@link #getDataSize()},
+ * {@link #getHeapSize()}, and {@link #getOffHeapSize()}, in the one go.
+ */
+ MemStoreSize getMemStoreSize();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
index 1349921..c72d385 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
@@ -44,7 +44,7 @@ public class MutableSegment extends Segment {
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
- incSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata
+ incMemStoreSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata
}
/**
@@ -89,7 +89,7 @@ public class MutableSegment extends Segment {
int cellLen = getCellLength(cur);
long heapSize = heapSizeChange(cur, true);
long offHeapSize = offHeapSizeChange(cur, true);
- this.incSize(-cellLen, -heapSize, -offHeapSize);
+ incMemStoreSize(-cellLen, -heapSize, -offHeapSize);
if (memStoreSizing != null) {
memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java
new file mode 100644
index 0000000..601ff33
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Accounting of current heap and data sizes.
+ * <em>NOT THREAD SAFE</em>.
+ * Use in a 'local' context only where just a single-thread is updating. No concurrency!
+ * Used, for example, when summing all Cells in a single batch where result is then applied to the
+ * Store.
+ * @see ThreadSafeMemStoreSizing
+ */
+@InterfaceAudience.Private
+class NonThreadSafeMemStoreSizing implements MemStoreSizing {
+ private long dataSize = 0;
+ private long heapSize = 0;
+ private long offHeapSize = 0;
+
+ NonThreadSafeMemStoreSizing() {
+ this(0, 0, 0);
+ }
+
+ NonThreadSafeMemStoreSizing(MemStoreSize mss) {
+ this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
+ }
+
+ NonThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
+ incMemStoreSize(dataSize, heapSize, offHeapSize);
+ }
+
+ @Override
+ public MemStoreSize getMemStoreSize() {
+ return new MemStoreSize(this.dataSize, this.heapSize, this.offHeapSize);
+ }
+
+ @Override
+ public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
+ long offHeapSizeDelta) {
+ this.offHeapSize += offHeapSizeDelta;
+ this.heapSize += heapSizeDelta;
+ this.dataSize += dataSizeDelta;
+ return this.dataSize;
+ }
+
+ @Override
+ public long getDataSize() {
+ return dataSize;
+ }
+
+ @Override
+ public long getHeapSize() {
+ return heapSize;
+ }
+
+ @Override
+ public long getOffHeapSize() {
+ return offHeapSize;
+ }
+
+ @Override
+ public String toString() {
+ return getMemStoreSize().toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
index 1c627f7..4e66fc7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
@@ -131,20 +131,20 @@ public class RegionServerAccounting {
return this.globalMemStoreOffHeapSize.sum();
}
- /**
- * @param memStoreSize the Memstore size will be added to
- * the global Memstore size
- */
- public void incGlobalMemStoreSize(MemStoreSize memStoreSize) {
- globalMemStoreDataSize.add(memStoreSize.getDataSize());
- globalMemStoreHeapSize.add(memStoreSize.getHeapSize());
- globalMemStoreOffHeapSize.add(memStoreSize.getOffHeapSize());
+ void incGlobalMemStoreSize(MemStoreSize mss) {
+ incGlobalMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
+ }
+
+ public void incGlobalMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
+ globalMemStoreDataSize.add(dataSizeDelta);
+ globalMemStoreHeapSize.add(heapSizeDelta);
+ globalMemStoreOffHeapSize.add(offHeapSizeDelta);
}
- public void decGlobalMemStoreSize(MemStoreSize memStoreSize) {
- globalMemStoreDataSize.add(-memStoreSize.getDataSize());
- globalMemStoreHeapSize.add(-memStoreSize.getHeapSize());
- globalMemStoreOffHeapSize.add(-memStoreSize.getOffHeapSize());
+ public void decGlobalMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
+ globalMemStoreDataSize.add(-dataSizeDelta);
+ globalMemStoreHeapSize.add(-heapSizeDelta);
+ globalMemStoreOffHeapSize.add(-offHeapSizeDelta);
}
/**
@@ -231,7 +231,7 @@ public class RegionServerAccounting {
// the region open operation. No need to handle multi thread issues on one region's entry in
// this Map.
if (replayEdistsSize == null) {
- replayEdistsSize = new MemStoreSizing();
+ replayEdistsSize = new ThreadSafeMemStoreSizing();
replayEditsPerRegion.put(regionName, replayEdistsSize);
}
replayEdistsSize.incMemStoreSize(memStoreSize);
@@ -244,10 +244,11 @@ public class RegionServerAccounting {
* @param regionName the region which could not open.
*/
public void rollbackRegionReplayEditsSize(byte[] regionName) {
- MemStoreSize replayEditsSize = replayEditsPerRegion.get(regionName);
- if (replayEditsSize != null) {
+ MemStoreSizing replayEditsSizing = replayEditsPerRegion.get(regionName);
+ if (replayEditsSizing != null) {
clearRegionReplayEditsSize(regionName);
- decGlobalMemStoreSize(replayEditsSize);
+ decGlobalMemStoreSize(replayEditsSizing.getDataSize(), replayEditsSizing.getHeapSize(),
+ replayEditsSizing.getOffHeapSize());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
index 5b98a27..b088856 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
@@ -65,8 +65,8 @@ public class RegionServicesForStores {
region.unblockUpdates();
}
- public void addMemStoreSize(MemStoreSize size) {
- region.incMemStoreSize(size);
+ public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
+ region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
}
public RegionInfo getRegionInfo() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 8aafa42..10f9b24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -553,9 +553,6 @@ public class ScannerContext {
/**
* Set all fields together.
- * @param batch
- * @param sizeScope
- * @param dataSize
*/
void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize,
LimitScope timeScope, long time) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index 517f537..7069bf8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -45,7 +45,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* segments from active set to snapshot set in the default implementation.
*/
@InterfaceAudience.Private
-public abstract class Segment {
+public abstract class Segment implements MemStoreSizing {
public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, memStoreSizing,
@@ -59,9 +59,9 @@ public abstract class Segment {
private final CellComparator comparator;
protected long minSequenceId;
private MemStoreLAB memStoreLAB;
- // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not
+ // Sum of sizes of all Cells added to this Segment. Cell's HeapSize is considered. This is not
// including the heap overhead of this class.
- protected final MemStoreSizing segmentSize;
+ protected final MemStoreSizing memStoreSizing;
protected final TimeRangeTracker timeRangeTracker;
protected volatile boolean tagsPresent;
@@ -69,7 +69,9 @@ public abstract class Segment {
// and there is no need in true Segments state
protected Segment(CellComparator comparator, TimeRangeTracker trt) {
this.comparator = comparator;
- this.segmentSize = new MemStoreSizing();
+ // Do we need to be thread safe always? What if ImmutableSegment?
+ // DITTO for the TimeRangeTracker below.
+ this.memStoreSizing = new ThreadSafeMemStoreSizing();
this.timeRangeTracker = trt;
}
@@ -85,7 +87,9 @@ public abstract class Segment {
OffHeapSize += memStoreSize.getOffHeapSize();
}
this.comparator = comparator;
- this.segmentSize = new MemStoreSizing(dataSize, heapSize, OffHeapSize);
+ // Do we need to be thread safe always? What if ImmutableSegment?
+ // DITTO for the TimeRangeTracker below.
+ this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize);
this.timeRangeTracker = trt;
}
@@ -95,7 +99,9 @@ public abstract class Segment {
this.comparator = comparator;
this.minSequenceId = Long.MAX_VALUE;
this.memStoreLAB = memStoreLAB;
- this.segmentSize = new MemStoreSizing();
+ // Do we need to be thread safe always? What if ImmutableSegment?
+ // DITTO for the TimeRangeTracker below.
+ this.memStoreSizing = new ThreadSafeMemStoreSizing();
this.tagsPresent = false;
this.timeRangeTracker = trt;
}
@@ -105,7 +111,7 @@ public abstract class Segment {
this.comparator = segment.getComparator();
this.minSequenceId = segment.getMinSequenceId();
this.memStoreLAB = segment.getMemStoreLAB();
- this.segmentSize = new MemStoreSizing(segment.getMemStoreSize());
+ this.memStoreSizing = new ThreadSafeMemStoreSizing(segment.memStoreSizing.getMemStoreSize());
this.tagsPresent = segment.isTagsPresent();
this.timeRangeTracker = segment.getTimeRangeTracker();
}
@@ -213,39 +219,29 @@ public abstract class Segment {
return this;
}
+ @Override
public MemStoreSize getMemStoreSize() {
- return this.segmentSize;
+ return this.memStoreSizing.getMemStoreSize();
}
- /**
- * @return Sum of all cell's size.
- */
- public long keySize() {
- return this.segmentSize.getDataSize();
+ @Override
+ public long getDataSize() {
+ return this.memStoreSizing.getDataSize();
}
- /**
- * @return The heap size of this segment.
- */
- public long heapSize() {
- return this.segmentSize.getHeapSize();
+ @Override
+ public long getHeapSize() {
+ return this.memStoreSizing.getHeapSize();
}
- /**
- * @return The off-heap size of this segment.
- */
- public long offHeapSize() {
- return this.segmentSize.getOffHeapSize();
+ @Override
+ public long getOffHeapSize() {
+ return this.memStoreSizing.getOffHeapSize();
}
- /**
- * Updates the size counters of the segment by the given delta
- */
- //TODO
- protected void incSize(long delta, long heapOverhead, long offHeapOverhead) {
- synchronized (this) {
- this.segmentSize.incMemStoreSize(delta, heapOverhead, offHeapOverhead);
- }
+ @Override
+ public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead) {
+ return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead);
}
public long getMinSequenceId() {
@@ -308,7 +304,7 @@ public abstract class Segment {
}
long heapSize = heapSizeChange(cellToAdd, succ);
long offHeapSize = offHeapSizeChange(cellToAdd, succ);
- incSize(cellSize, heapSize, offHeapSize);
+ incMemStoreSize(cellSize, heapSize, offHeapSize);
if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(cellSize, heapSize, offHeapSize);
}
@@ -408,8 +404,8 @@ public abstract class Segment {
String res = "type=" + this.getClass().getSimpleName() + ", ";
res += "empty=" + (isEmpty()? "yes": "no") + ", ";
res += "cellCount=" + getCellsCount() + ", ";
- res += "cellSize=" + keySize() + ", ";
- res += "totalHeapSize=" + heapSize() + ", ";
+ res += "cellSize=" + getDataSize() + ", ";
+ res += "totalHeapSize=" + getHeapSize() + ", ";
res += "min timestamp=" + timeRangeTracker.getMin() + ", ";
res += "max timestamp=" + timeRangeTracker.getMax();
return res;
http://git-wip-us.apache.org/repos/asf/hbase/blob/021f66d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java
new file mode 100644
index 0000000..de05493
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Accounting of current heap and data sizes.
+ * Thread-safe. Many threads can do updates against this single instance.
+ * @see NonThreadSafeMemStoreSizing
+ * @see MemStoreSize
+ */
+@InterfaceAudience.Private
+class ThreadSafeMemStoreSizing implements MemStoreSizing {
+ // We used to tie the update of these thread counters so
+ // they all changed together under one lock. This was
+ // undone. Doesn't seem necessary.
+ private final AtomicLong dataSize = new AtomicLong();
+ private final AtomicLong heapSize = new AtomicLong();
+ private final AtomicLong offHeapSize = new AtomicLong();
+
+ ThreadSafeMemStoreSizing() {
+ this(0, 0, 0);
+ }
+
+ ThreadSafeMemStoreSizing(MemStoreSize mss) {
+ this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
+ }
+
+ ThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
+ incMemStoreSize(dataSize, heapSize, offHeapSize);
+ }
+
+ public MemStoreSize getMemStoreSize() {
+ return new MemStoreSize(getDataSize(), getHeapSize(), getOffHeapSize());
+ }
+
+ public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
+ this.offHeapSize.addAndGet(offHeapSizeDelta);
+ this.heapSize.addAndGet(heapSizeDelta);
+ return this.dataSize.addAndGet(dataSizeDelta);
+ }
+
+ @Override
+ public long getDataSize() {
+ return dataSize.get();
+ }
+
+ @Override
+ public long getHeapSize() {
+ return heapSize.get();
+ }
+
+ @Override
+ public long getOffHeapSize() {
+ return offHeapSize.get();
+ }
+
+ @Override
+ public String toString() {
+ return getMemStoreSize().toString();
+ }
+}