You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/03/28 19:22:17 UTC

[2/3] hbase git commit: Revert "Reviving the merge of the compacting pipeline: making the limit on the number of the segments in the pipeline configurable, adding merge test, fixing bug in sizes counting"

Revert "Reviving the merge of the compacting pipeline: making the limit on the number of the segments in the pipeline configurable, adding merge test, fixing bug in sizes counting"

This reverts commit c77e2135db07b6417f5fea4577c2c7ae8d6d7008.

Bad commit message


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/046d4e18
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/046d4e18
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/046d4e18

Branch: refs/heads/hbase-12439
Commit: 046d4e183bb02c6bc2d00b6e67c1c30d315454a8
Parents: ea566e7
Author: Sean Busbey <bu...@apache.org>
Authored: Tue Mar 28 10:53:41 2017 -0500
Committer: Sean Busbey <bu...@apache.org>
Committed: Tue Mar 28 10:53:41 2017 -0500

----------------------------------------------------------------------
 .../hbase/regionserver/CompactingMemStore.java  |  9 +--
 .../hbase/regionserver/CompactionPipeline.java  |  8 +-
 .../hbase/regionserver/ImmutableSegment.java    |  4 +-
 .../hbase/regionserver/MemStoreCompactor.java   | 21 ++---
 .../TestWalAndCompactingMemStoreFlush.java      | 83 +++++++++-----------
 5 files changed, 50 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/046d4e18/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 0c56693..26b2f49 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -279,8 +279,7 @@ public class CompactingMemStore extends AbstractMemStore {
 
   public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
       boolean merge) {
-    // last true stands for updating the region size
-    return pipeline.swap(versionedList, result, !merge, true);
+    return pipeline.swap(versionedList, result, !merge);
   }
 
   /**
@@ -438,8 +437,7 @@ public class CompactingMemStore extends AbstractMemStore {
   private void pushTailToSnapshot() {
     VersionedSegmentsList segments = pipeline.getVersionedTail();
     pushToSnapshot(segments.getStoreSegments());
-    // In Swap: don't close segments (they are in snapshot now) and don't update the region size
-    pipeline.swap(segments,null,false, false);
+    pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
   }
 
   private void pushPipelineToSnapshot() {
@@ -451,8 +449,7 @@ public class CompactingMemStore extends AbstractMemStore {
       pushToSnapshot(segments.getStoreSegments());
       // swap can return false in case the pipeline was updated by ongoing compaction
       // and the version increase, the chance of it happenning is very low
-      // In Swap: don't close segments (they are in snapshot now) and don't update the region size
-      done = pipeline.swap(segments, null, false, false);
+      done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now
       if (iterationsCnt>2) {
         // practically it is impossible that this loop iterates more than two times
         // (because the compaction is stopped and none restarts it while in snapshot request),

http://git-wip-us.apache.org/repos/asf/hbase/blob/046d4e18/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 06e83a3..e64c0fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -106,16 +106,12 @@ public class CompactionPipeline {
    *                removed.
    * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
    *        During index merge op this will be false and for compaction it will be true.
-   * @param updateRegionSize whether to update the region size. Update the region size,
-   *                         when the pipeline is swapped as part of in-memory-flush and
-   *                         further merge/compaction. Don't update the region size when the
-   *                         swap is result of the snapshot (flush-to-disk).
    * @return true iff swapped tail with new segment
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
       justification="Increment is done under a synchronize block so safe")
   public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
-      boolean closeSuffix, boolean updateRegionSize) {
+      boolean closeSuffix) {
     if (versionedList.getVersion() != version) {
       return false;
     }
@@ -139,7 +135,7 @@ public class CompactionPipeline {
       readOnlyCopy = new LinkedList<>(pipeline);
       version++;
     }
-    if (updateRegionSize && region != null) {
+    if (closeSuffix && region != null) {
       // update the global memstore size counter
       long suffixDataSize = getSegmentsKeySize(suffix);
       long newDataSize = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/046d4e18/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 19b66b4..f1273a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -99,7 +99,7 @@ public class ImmutableSegment extends Segment {
     super(null, // initiailize the CellSet with NULL
         comparator, memStoreLAB);
     this.type = type;
-    // build the new CellSet based on CellArrayMap
+    // build the true CellSet based on CellArrayMap
     CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge);
 
     this.setCellSet(null, cs);            // update the CellSet of the new Segment
@@ -203,7 +203,7 @@ public class ImmutableSegment extends Segment {
         cells[i] = maybeCloneWithAllocator(c);
       }
       boolean useMSLAB = (getMemStoreLAB()!=null);
-      // second parameter true, because in compaction/merge the addition of the cell to new segment
+      // second parameter true, because in compaction addition of the cell to new segment
       // is always successful
       updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell
       i++;

http://git-wip-us.apache.org/repos/asf/hbase/blob/046d4e18/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 0d3f47e..dfa7d18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -44,26 +44,22 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @InterfaceAudience.Private
 public class MemStoreCompactor {
 
-  // The upper bound for the number of segments we store in the pipeline prior to merging.
-  // This constant is subject to further experimentation.
-  // The external setting of the compacting MemStore behaviour
-  public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
-      "hbase.hregion.compacting.pipeline.segments.limit";
-  // remaining with the same ("infinity") but configurable default for now
-  public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 30;
-
   public static final long DEEP_OVERHEAD = ClassSize
       .align(ClassSize.OBJECT
           + 4 * ClassSize.REFERENCE
           // compactingMemStore, versionedList, action, isInterrupted (the reference)
           // "action" is an enum and thus it is a class with static final constants,
           // so counting only the size of the reference to it and not the size of the internals
-          + 2 * Bytes.SIZEOF_INT        // compactionKVMax, pipelineThreshold
+          + Bytes.SIZEOF_INT            // compactionKVMax
           + ClassSize.ATOMIC_BOOLEAN    // isInterrupted (the internals)
       );
 
+  // The upper bound for the number of segments we store in the pipeline prior to merging.
+  // This constant is subject to further experimentation.
+  private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity
+
   private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
-  private final int pipelineThreshold; // the limit on the number of the segments in the pipeline
+
   private CompactingMemStore compactingMemStore;
 
   // a static version of the segment list from the pipeline
@@ -95,9 +91,6 @@ public class MemStoreCompactor {
     this.compactionKVMax = compactingMemStore.getConfiguration()
         .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
     initiateAction(compactionPolicy);
-    pipelineThreshold =         // get the limit on the number of the segments in the pipeline
-        compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY,
-            COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
   }
 
   /**----------------------------------------------------------------------
@@ -168,7 +161,7 @@ public class MemStoreCompactor {
     // compaction shouldn't happen or doesn't worth it
     // limit the number of the segments in the pipeline
     int numOfSegments = versionedList.getNumOfSegments();
-    if (numOfSegments > pipelineThreshold) {
+    if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) {
       LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
           + " is going to be merged, as there are " + numOfSegments + " segments");
       return Action.MERGE;          // to avoid too many segments, merge now

http://git-wip-us.apache.org/repos/asf/hbase/blob/046d4e18/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index aae0a4d..57eee30 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -756,24 +756,22 @@ public class TestWalAndCompactingMemStoreFlush {
   }
 
   @Test(timeout = 180000)
-  public void testSelectiveFlushWithBasicAndMerge() throws IOException {
+  public void testSelectiveFlushAndWALinIndexCompaction() throws IOException {
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
-    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
         FlushNonSloppyStoresFirstPolicy.class.getName());
     conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
-        75 * 1024);
+        200 * 1024);
     conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
-    // set memstore to do index compaction with merge
+    // set memstore to do data compaction and not to use the speculative scan
     conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(MemoryCompactionPolicy.BASIC));
-    // length of pipeline that requires merge
-    conf.setInt(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
 
     // Intialize the HRegion
-    HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);
-    // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3
+    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
+    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
     for (int i = 1; i <= 1200; i++) {
       region.put(createPut(1, i));
       if (i <= 100) {
@@ -783,7 +781,7 @@ public class TestWalAndCompactingMemStoreFlush {
         }
       }
     }
-    // Now put more entries to CF2
+    // Now add more puts for CF2, so that we only flush CF2 to disk
     for (int i = 100; i < 2000; i++) {
       region.put(createPut(2, i));
     }
@@ -802,14 +800,13 @@ public class TestWalAndCompactingMemStoreFlush {
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
-    assertEquals(totalMemstoreSize,
-        cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize()
-            + cf3MemstoreSizePhaseI.getDataSize());
+    assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
+        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
 
-    // Initiate in-memory Flush!
+    // Flush!
     ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
     ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
-    // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done
+    // CF1 and CF3 should be compacted so wait here to be sure the compaction is done
     while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
         .isMemStoreFlushingInMemory()) {
       Threads.sleep(10);
@@ -818,22 +815,21 @@ public class TestWalAndCompactingMemStoreFlush {
         .isMemStoreFlushingInMemory()) {
       Threads.sleep(10);
     }
-
-    // Flush-to-disk! CF2 only should be flushed
     region.flush(false);
 
-    MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore();
     MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
-    MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
 
-    // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller
-    assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize());
-    // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same
-    assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize());
+    long smallestSeqInRegionCurrentMemstorePhaseII = region.getWAL()
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
+
     // CF2 should have been cleared
     assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
+    assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize());
 
-    // Add the same amount of entries to see the merging
+    // Add same entries to compact them later
     for (int i = 1; i <= 1200; i++) {
       region.put(createPut(1, i));
       if (i <= 100) {
@@ -848,12 +844,16 @@ public class TestWalAndCompactingMemStoreFlush {
       region.put(createPut(2, i));
     }
 
-    MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore();
+    long smallestSeqInRegionCurrentMemstorePhaseIII = region.getWAL()
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
 
-    // Flush in memory!
+    // Flush!
     ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
     ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
-    // CF1 and CF3 should be merged so wait here to be sure the merge is done
+    // CF1 and CF3 should be compacted so wait here to be sure the compaction is done
     while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
         .isMemStoreFlushingInMemory()) {
       Threads.sleep(10);
@@ -864,28 +864,17 @@ public class TestWalAndCompactingMemStoreFlush {
     }
     region.flush(false);
 
-    MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore();
-    MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
+    long smallestSeqInRegionCurrentMemstorePhaseIV = region.getWAL()
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
 
-    assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
-    assertEquals(
-        cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
-        cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize());
-    assertEquals(3, // active, one in pipeline, snapshot
-        ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size());
-    // CF2 should have been cleared
-    assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes,"
-            + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII
-            .getDataSize() + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII
-            .getHeapSize() + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/"
-            + cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/"
-            + cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize()
-            + "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize()
-            + "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes "
-            + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/" + cf1MemstoreSizePhaseIV
-            .getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/" + cf1MemstoreSizePhaseIV
-            .getHeapSize() + "\n",
-        0, cf2MemstoreSizePhaseIV.getDataSize());
+    // now check that the LSN of the entire WAL, of CF1 and of CF3 has NOT progressed due to merge
+    assertFalse(
+        smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII);
+    assertFalse(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
+    assertFalse(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
 
     HBaseTestingUtility.closeRegionAndWAL(region);
   }