You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2017/03/27 12:41:52 UTC
hbase git commit: Reviving the merge of the compacting pipeline:
making the limit on the number of the segments in the pipeline configurable,
adding merge test, fixing bug in sizes counting
Repository: hbase
Updated Branches:
refs/heads/master 04fc45503 -> c77e2135d
Reviving the merge of the compacting pipeline: making the limit on the number of the segments in the pipeline configurable, adding merge test, fixing bug in sizes counting
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c77e2135
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c77e2135
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c77e2135
Branch: refs/heads/master
Commit: c77e2135db07b6417f5fea4577c2c7ae8d6d7008
Parents: 04fc455
Author: anastas <an...@yahoo-inc.com>
Authored: Mon Mar 27 15:41:32 2017 +0300
Committer: anastas <an...@yahoo-inc.com>
Committed: Mon Mar 27 15:41:32 2017 +0300
----------------------------------------------------------------------
.../hbase/regionserver/CompactingMemStore.java | 9 ++-
.../hbase/regionserver/CompactionPipeline.java | 8 +-
.../hbase/regionserver/ImmutableSegment.java | 4 +-
.../hbase/regionserver/MemStoreCompactor.java | 21 +++--
.../TestWalAndCompactingMemStoreFlush.java | 83 +++++++++++---------
5 files changed, 75 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c77e2135/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 26b2f49..0c56693 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -279,7 +279,8 @@ public class CompactingMemStore extends AbstractMemStore {
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) {
- return pipeline.swap(versionedList, result, !merge);
+ // last true stands for updating the region size
+ return pipeline.swap(versionedList, result, !merge, true);
}
/**
@@ -437,7 +438,8 @@ public class CompactingMemStore extends AbstractMemStore {
private void pushTailToSnapshot() {
VersionedSegmentsList segments = pipeline.getVersionedTail();
pushToSnapshot(segments.getStoreSegments());
- pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
+ // In Swap: don't close segments (they are in snapshot now) and don't update the region size
+ pipeline.swap(segments,null,false, false);
}
private void pushPipelineToSnapshot() {
@@ -449,7 +451,8 @@ public class CompactingMemStore extends AbstractMemStore {
pushToSnapshot(segments.getStoreSegments());
// swap can return false in case the pipeline was updated by ongoing compaction
// and the version increase, the chance of it happenning is very low
- done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now
+ // In Swap: don't close segments (they are in snapshot now) and don't update the region size
+ done = pipeline.swap(segments, null, false, false);
if (iterationsCnt>2) {
// practically it is impossible that this loop iterates more than two times
// (because the compaction is stopped and none restarts it while in snapshot request),
http://git-wip-us.apache.org/repos/asf/hbase/blob/c77e2135/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index e64c0fb..06e83a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -106,12 +106,16 @@ public class CompactionPipeline {
* removed.
* @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
* During index merge op this will be false and for compaction it will be true.
+ * @param updateRegionSize whether to update the region size. Update the region size,
+ * when the pipeline is swapped as part of in-memory-flush and
+ * further merge/compaction. Don't update the region size when the
+ * swap is result of the snapshot (flush-to-disk).
* @return true iff swapped tail with new segment
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
justification="Increment is done under a synchronize block so safe")
public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
- boolean closeSuffix) {
+ boolean closeSuffix, boolean updateRegionSize) {
if (versionedList.getVersion() != version) {
return false;
}
@@ -135,7 +139,7 @@ public class CompactionPipeline {
readOnlyCopy = new LinkedList<>(pipeline);
version++;
}
- if (closeSuffix && region != null) {
+ if (updateRegionSize && region != null) {
// update the global memstore size counter
long suffixDataSize = getSegmentsKeySize(suffix);
long newDataSize = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c77e2135/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index f1273a9..19b66b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -99,7 +99,7 @@ public class ImmutableSegment extends Segment {
super(null, // initiailize the CellSet with NULL
comparator, memStoreLAB);
this.type = type;
- // build the true CellSet based on CellArrayMap
+ // build the new CellSet based on CellArrayMap
CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge);
this.setCellSet(null, cs); // update the CellSet of the new Segment
@@ -203,7 +203,7 @@ public class ImmutableSegment extends Segment {
cells[i] = maybeCloneWithAllocator(c);
}
boolean useMSLAB = (getMemStoreLAB()!=null);
- // second parameter true, because in compaction addition of the cell to new segment
+ // second parameter true, because in compaction/merge the addition of the cell to new segment
// is always successful
updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell
i++;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c77e2135/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index dfa7d18..0d3f47e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -44,22 +44,26 @@ import java.util.concurrent.atomic.AtomicBoolean;
@InterfaceAudience.Private
public class MemStoreCompactor {
+ // The upper bound for the number of segments we store in the pipeline prior to merging.
+ // This constant is subject to further experimentation.
+ // The external setting of the compacting MemStore behaviour
+ public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
+ "hbase.hregion.compacting.pipeline.segments.limit";
+ // remaining with the same ("infinity") but configurable default for now
+ public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 30;
+
public static final long DEEP_OVERHEAD = ClassSize
.align(ClassSize.OBJECT
+ 4 * ClassSize.REFERENCE
// compactingMemStore, versionedList, action, isInterrupted (the reference)
// "action" is an enum and thus it is a class with static final constants,
// so counting only the size of the reference to it and not the size of the internals
- + Bytes.SIZEOF_INT // compactionKVMax
+ + 2 * Bytes.SIZEOF_INT // compactionKVMax, pipelineThreshold
+ ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals)
);
- // The upper bound for the number of segments we store in the pipeline prior to merging.
- // This constant is subject to further experimentation.
- private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity
-
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
-
+ private final int pipelineThreshold; // the limit on the number of the segments in the pipeline
private CompactingMemStore compactingMemStore;
// a static version of the segment list from the pipeline
@@ -91,6 +95,9 @@ public class MemStoreCompactor {
this.compactionKVMax = compactingMemStore.getConfiguration()
.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
initiateAction(compactionPolicy);
+ pipelineThreshold = // get the limit on the number of the segments in the pipeline
+ compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY,
+ COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
}
/**----------------------------------------------------------------------
@@ -161,7 +168,7 @@ public class MemStoreCompactor {
// compaction shouldn't happen or doesn't worth it
// limit the number of the segments in the pipeline
int numOfSegments = versionedList.getNumOfSegments();
- if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) {
+ if (numOfSegments > pipelineThreshold) {
LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " is going to be merged, as there are " + numOfSegments + " segments");
return Action.MERGE; // to avoid too many segments, merge now
http://git-wip-us.apache.org/repos/asf/hbase/blob/c77e2135/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index 57eee30..aae0a4d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -756,22 +756,24 @@ public class TestWalAndCompactingMemStoreFlush {
}
@Test(timeout = 180000)
- public void testSelectiveFlushAndWALinIndexCompaction() throws IOException {
+ public void testSelectiveFlushWithBasicAndMerge() throws IOException {
// Set up the configuration
Configuration conf = HBaseConfiguration.create();
- conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
+ conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
FlushNonSloppyStoresFirstPolicy.class.getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
- 200 * 1024);
+ 75 * 1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
- // set memstore to do data compaction and not to use the speculative scan
+ // set memstore to do index compaction with merge
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(MemoryCompactionPolicy.BASIC));
+ // length of pipeline that requires merge
+ conf.setInt(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
// Intialize the HRegion
- HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
- // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+ HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);
+ // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3
for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i));
if (i <= 100) {
@@ -781,7 +783,7 @@ public class TestWalAndCompactingMemStoreFlush {
}
}
}
- // Now add more puts for CF2, so that we only flush CF2 to disk
+ // Now put more entries to CF2
for (int i = 100; i < 2000; i++) {
region.put(createPut(2, i));
}
@@ -800,13 +802,14 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3.
- assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
- + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
+ assertEquals(totalMemstoreSize,
+ cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize()
+ + cf3MemstoreSizePhaseI.getDataSize());
- // Flush!
+ // Initiate in-memory Flush!
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
- // CF1 and CF3 should be compacted so wait here to be sure the compaction is done
+ // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
@@ -815,21 +818,22 @@ public class TestWalAndCompactingMemStoreFlush {
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
+
+ // Flush-to-disk! CF2 only should be flushed
region.flush(false);
+ MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
+ MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
- long smallestSeqInRegionCurrentMemstorePhaseII = region.getWAL()
- .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
- long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
- long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
- long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
-
+ // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller
+ assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize());
+ // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same
+ assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize());
// CF2 should have been cleared
assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
- assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize());
- // Add same entries to compact them later
+ // Add the same amount of entries to see the merging
for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i));
if (i <= 100) {
@@ -844,16 +848,12 @@ public class TestWalAndCompactingMemStoreFlush {
region.put(createPut(2, i));
}
- long smallestSeqInRegionCurrentMemstorePhaseIII = region.getWAL()
- .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
- long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
- long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
- long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
+ MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore();
- // Flush!
+ // Flush in memory!
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
- // CF1 and CF3 should be compacted so wait here to be sure the compaction is done
+ // CF1 and CF3 should be merged so wait here to be sure the merge is done
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
@@ -864,17 +864,28 @@ public class TestWalAndCompactingMemStoreFlush {
}
region.flush(false);
- long smallestSeqInRegionCurrentMemstorePhaseIV = region.getWAL()
- .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
- long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
- long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
- long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
+ MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore();
+ MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
- // now check that the LSN of the entire WAL, of CF1 and of CF3 has NOT progressed due to merge
- assertFalse(
- smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII);
- assertFalse(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
- assertFalse(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
+ assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
+ assertEquals(
+ cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
+ cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize());
+ assertEquals(3, // active, one in pipeline, snapshot
+ ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size());
+ // CF2 should have been cleared
+ assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes,"
+ + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII
+ .getDataSize() + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII
+ .getHeapSize() + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/"
+ + cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/"
+ + cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize()
+ + "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize()
+ + "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes "
+ + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/" + cf1MemstoreSizePhaseIV
+ .getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/" + cf1MemstoreSizePhaseIV
+ .getHeapSize() + "\n",
+ 0, cf2MemstoreSizePhaseIV.getDataSize());
HBaseTestingUtility.closeRegionAndWAL(region);
}