You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/12/28 18:53:12 UTC

hbase git commit: HBASE-17081 Flush the entire CompactingMemStore content to disk - revert due to failure in TestHRegionWithInMemoryFlush

Repository: hbase
Updated Branches:
  refs/heads/master da97569ea -> 79e5efd35


HBASE-17081 Flush the entire CompactingMemStore content to disk - revert due to failure in TestHRegionWithInMemoryFlush


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/79e5efd3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/79e5efd3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/79e5efd3

Branch: refs/heads/master
Commit: 79e5efd35c9f3660b8c58364f25816581fb84d7a
Parents: da97569
Author: tedyu <yu...@gmail.com>
Authored: Wed Dec 28 10:53:07 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Wed Dec 28 10:53:07 2016 -0800

----------------------------------------------------------------------
 .../hbase/regionserver/AbstractMemStore.java    |  35 +-
 .../hbase/regionserver/CompactingMemStore.java  |  83 ++---
 .../hbase/regionserver/CompactionPipeline.java  |  34 +-
 .../regionserver/CompositeImmutableSegment.java | 352 -------------------
 .../hbase/regionserver/DefaultMemStore.java     |  23 +-
 .../hadoop/hbase/regionserver/HRegion.java      |   5 +-
 .../hbase/regionserver/ImmutableSegment.java    |  23 +-
 .../hbase/regionserver/MemStoreCompactor.java   |   4 +-
 .../hadoop/hbase/regionserver/MemstoreSize.java |  25 +-
 .../hadoop/hbase/regionserver/Segment.java      |  21 +-
 .../hbase/regionserver/SegmentFactory.java      |  10 -
 .../regionserver/TestCompactingMemStore.java    |   8 +-
 .../hbase/regionserver/TestDefaultMemStore.java |  12 +-
 .../TestWalAndCompactingMemStoreFlush.java      | 238 ++++++-------
 14 files changed, 175 insertions(+), 698 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 8564045..225dd73 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -159,12 +159,14 @@ public abstract class AbstractMemStore implements MemStore {
   public String toString() {
     StringBuffer buf = new StringBuffer();
     int i = 1;
-
-    for (Segment segment : getSegments()) {
-      buf.append("Segment (" + i + ") " + segment.toString() + "; ");
-      i++;
+    try {
+      for (Segment segment : getSegments()) {
+        buf.append("Segment (" + i + ") " + segment.toString() + "; ");
+        i++;
+      }
+    } catch (IOException e){
+      return e.toString();
     }
-
     return buf.toString();
   }
 
@@ -230,7 +232,6 @@ public abstract class AbstractMemStore implements MemStore {
    * @return Next row or null if none found.  If one found, will be a new
    * KeyValue -- can be destroyed by subsequent calls to this method.
    */
-  @VisibleForTesting
   protected Cell getNextRow(final Cell key,
       final NavigableSet<Cell> set) {
     Cell result = null;
@@ -248,26 +249,6 @@ public abstract class AbstractMemStore implements MemStore {
     return result;
   }
 
-  /**
-   * @param cell Find the row that comes after this one.  If null, we return the
-   *             first.
-   * @return Next row or null if none found.
-   */
-  @VisibleForTesting
-  Cell getNextRow(final Cell cell) {
-    Cell lowest = null;
-    List<Segment> segments = getSegments();
-    for (Segment segment : segments) {
-      if (lowest == null) {
-        //TODO: we may want to move the getNextRow ability to the segment
-        lowest = getNextRow(cell, segment.getCellSet());
-      } else {
-        lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
-      }
-    }
-    return lowest;
-  }
-
   private Cell maybeCloneWithAllocator(Cell cell) {
     return active.maybeCloneWithAllocator(cell);
   }
@@ -326,6 +307,6 @@ public abstract class AbstractMemStore implements MemStore {
   /**
    * @return an ordered list of segments from most recent to oldest in memstore
    */
-  protected abstract List<Segment> getSegments();
+  protected abstract List<Segment> getSegments() throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 1cd30dd..f8192a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -72,7 +72,6 @@ public class CompactingMemStore extends AbstractMemStore {
   private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
   @VisibleForTesting
   private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
-  private boolean compositeSnapshot = true;
 
   public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD
       + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
@@ -161,12 +160,7 @@ public class CompactingMemStore extends AbstractMemStore {
       stopCompaction();
       pushActiveToPipeline(this.active);
       snapshotId = EnvironmentEdgeManager.currentTime();
-      // in both cases whatever is pushed to snapshot is cleared from the pipeline
-      if (compositeSnapshot) {
-        pushPipelineToSnapshot();
-      } else {
-        pushTailToSnapshot();
-      }
+      pushTailToSnapshot();
     }
     return new MemStoreSnapshot(snapshotId, this.snapshot);
   }
@@ -179,13 +173,8 @@ public class CompactingMemStore extends AbstractMemStore {
   public MemstoreSize getFlushableSize() {
     MemstoreSize snapshotSize = getSnapshotSize();
     if (snapshotSize.getDataSize() == 0) {
-      // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
-      if (compositeSnapshot) {
-        snapshotSize = pipeline.getPipelineSize();
-        snapshotSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead());
-      } else {
-        snapshotSize = pipeline.getTailSize();
-      }
+      // if snapshot is empty the tail of the pipeline is flushed
+      snapshotSize = pipeline.getTailSize();
     }
     return snapshotSize.getDataSize() > 0 ? snapshotSize
         : new MemstoreSize(this.active.keySize(), this.active.heapOverhead());
@@ -224,28 +213,16 @@ public class CompactingMemStore extends AbstractMemStore {
     }
   }
 
-  // the getSegments() method is used for tests only
-  @VisibleForTesting
   @Override
   public List<Segment> getSegments() {
     List<Segment> pipelineList = pipeline.getSegments();
     List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
     list.add(this.active);
     list.addAll(pipelineList);
-    list.addAll(this.snapshot.getAllSegments());
-
+    list.add(this.snapshot);
     return list;
   }
 
-  // the following three methods allow to manipulate the settings of composite snapshot
-  public void setCompositeSnapshot(boolean useCompositeSnapshot) {
-    this.compositeSnapshot = useCompositeSnapshot;
-  }
-
-  public boolean isCompositeSnapshot() {
-    return this.compositeSnapshot;
-  }
-
   public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
       boolean merge) {
     return pipeline.swap(versionedList, result, !merge);
@@ -285,20 +262,18 @@ public class CompactingMemStore extends AbstractMemStore {
    * Scanners are ordered from 0 (oldest) to newest in increasing order.
    */
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
-
-    int order = 1;                        // for active segment
-    order += pipeline.size();             // for all segments in the pipeline
-    order += snapshot.getNumOfSegments(); // for all segments in the snapshot
-    // TODO: check alternatives to using this order
-    // The list of elements in pipeline + the active element + the snapshot segments
-    // The order is the Segment ordinal
-    List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(order);
-    list.add(this.active.getScanner(readPt, order));
-    order--;
-    list.addAll(pipeline.getScanners(readPt,order));
-    order -= pipeline.size();
-    list.addAll(snapshot.getScanners(readPt,order));
-    return Collections.<KeyValueScanner>singletonList(new MemStoreScanner(getComparator(), list));
+    List<Segment> pipelineList = pipeline.getSegments();
+    long order = pipelineList.size();
+    // The list of elements in pipeline + the active element + the snapshot segment
+    // TODO : This will change when the snapshot is made of more than one element
+    List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
+    list.add(this.active.getScanner(readPt, order + 1));
+    for (Segment item : pipelineList) {
+      list.add(item.getScanner(readPt, order));
+      order--;
+    }
+    list.add(this.snapshot.getScanner(readPt, order));
+    return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
   }
 
   /**
@@ -405,14 +380,6 @@ public class CompactingMemStore extends AbstractMemStore {
     }
   }
 
-  private void pushPipelineToSnapshot() {
-    List<ImmutableSegment> segments = pipeline.drain();
-    if (!segments.isEmpty()) {
-      this.snapshot =
-          SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments);
-    }
-  }
-
   private RegionServicesForStores getRegionServices() {
     return regionServices;
   }
@@ -460,6 +427,24 @@ public class CompactingMemStore extends AbstractMemStore {
     compactor.initiateAction(compactionType);
   }
 
+  /**
+   * @param cell Find the row that comes after this one.  If null, we return the
+   *             first.
+   * @return Next row or null if none found.
+   */
+  Cell getNextRow(final Cell cell) {
+    Cell lowest = null;
+    List<Segment> segments = getSegments();
+    for (Segment segment : segments) {
+      if (lowest == null) {
+        lowest = getNextRow(cell, segment.getCellSet());
+      } else {
+        lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
+      }
+    }
+    return lowest;
+  }
+
   // debug method
   public void debug() {
     String msg = "active size=" + this.active.keySize();

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 2fd2a14..6676170 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -78,19 +77,6 @@ public class CompactionPipeline {
     }
   }
 
-  public List<ImmutableSegment> drain() {
-    int drainSize = pipeline.size();
-    List<ImmutableSegment> result = new ArrayList<ImmutableSegment>(drainSize);
-    synchronized (pipeline){
-      version++;
-      for(int i=0; i<drainSize; i++) {
-        ImmutableSegment segment = this.pipeline.removeFirst();
-        result.add(i,segment);
-      }
-      return result;
-    }
-  }
-
   public VersionedSegmentsList getVersionedList() {
     synchronized (pipeline){
       LinkedList<ImmutableSegment> segmentList = new LinkedList<ImmutableSegment>(pipeline);
@@ -207,7 +193,8 @@ public class CompactionPipeline {
 
   public List<Segment> getSegments() {
     synchronized (pipeline){
-      return new LinkedList<Segment>(pipeline);
+      List<Segment> res = new LinkedList<Segment>(pipeline);
+      return res;
     }
   }
 
@@ -215,18 +202,6 @@ public class CompactionPipeline {
     return pipeline.size();
   }
 
-  public List<KeyValueScanner> getScanners(long readPoint, long order) {
-    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(this.pipeline.size());
-    for (Segment segment : this.pipeline) {
-      scanners.add(segment.getScanner(readPoint, order));
-      // The order is the Segment ordinal
-      order--;
-      assert order>=0; // order should never be negative so this is just a sanity check
-    }
-    return scanners;
-  }
-
-
   public long getMinSequenceId() {
     long minSequenceId = Long.MAX_VALUE;
     if (!isEmpty()) {
@@ -240,11 +215,6 @@ public class CompactionPipeline {
     return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead());
   }
 
-  public MemstoreSize getPipelineSize() {
-    if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
-    return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline));
-  }
-
   private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,
       boolean closeSegmentsInSuffix) {
     version++;

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
deleted file mode 100644
index 4fdd2d0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.SortedSet;
-
-/**
- * The CompositeImmutableSegments is created as a collection of ImmutableSegments and supports
- * the interface of a single ImmutableSegments.
- * The CompositeImmutableSegments is planned to be used only as a snapshot,
- * thus only relevant interfaces are supported
- */
-@InterfaceAudience.Private
-public class CompositeImmutableSegment extends ImmutableSegment {
-
-  private final List<ImmutableSegment> segments;
-  private final CellComparator comparator;
-  // CompositeImmutableSegment is used for snapshots and snapshot should
-  // support getTimeRangeTracker() interface.
-  // Thus we hold a constant TRT build in the construction time from TRT of the given segments.
-  private final TimeRangeTracker timeRangeTracker;
-  private long keySize = 0;
-
-  // This scanner need to be remembered in order to close it when the snapshot is cleared.
-  // Initially CollectionBackedScanner didn't raise the scanner counters thus there was no
-  // need to close it. Now when MemStoreScanner is used instead we need to decrease the
-  // scanner counters.
-  private KeyValueScanner flushingScanner = null;
-
-  public CompositeImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) {
-    super(comparator);
-    this.comparator = comparator;
-    this.segments = segments;
-    this.timeRangeTracker = new TimeRangeTracker();
-    for (ImmutableSegment s : segments) {
-      this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
-      this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());
-      this.keySize += s.keySize();
-    }
-  }
-
-  @VisibleForTesting
-  public List<Segment> getAllSegments() {
-    return new LinkedList<Segment>(segments);
-  }
-
-  public long getNumOfSegments() {
-    return segments.size();
-  }
-
-  /**
-   * Builds a special scanner for the MemStoreSnapshot object that is different than the
-   * general segment scanner.
-   * @return a special scanner for the MemStoreSnapshot object
-   */
-  public KeyValueScanner getKeyValueScanner() {
-    KeyValueScanner scanner;
-    List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
-    for (ImmutableSegment s : segments) {
-      list.add(s.getScanner(Long.MAX_VALUE));
-    }
-
-    try {
-      scanner = new MemStoreScanner(getComparator(), list);
-    } catch (IOException ie) {
-      throw new IllegalStateException(ie);
-    }
-
-    flushingScanner = scanner;
-    return scanner;
-  }
-
-  @Override
-  public List<KeyValueScanner> getScanners(long readPoint, long order) {
-    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(this.segments.size());
-    for (Segment segment : this.segments) {
-      scanners.add(segment.getScanner(readPoint, order));
-      // The order is the Segment ordinal
-      order--;
-      // order should never be negative so this is just a sanity check
-      order = (order<0) ? 0 : order;
-    }
-    return scanners;
-  }
-
-  /**
-   * @return whether the segment has any cells
-   */
-  public boolean isEmpty() {
-    for (ImmutableSegment s : segments) {
-      if (!s.isEmpty()) return false;
-    }
-    return true;
-  }
-
-  /**
-   * @return number of cells in segment
-   */
-  public int getCellsCount() {
-    int result = 0;
-    for (ImmutableSegment s : segments) {
-      result += s.getCellsCount();
-    }
-    return result;
-  }
-
-  /**
-   * @return the first cell in the segment that has equal or greater key than the given cell
-   */
-  public Cell getFirstAfter(Cell cell) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  /**
-   * Closing a segment before it is being discarded
-   */
-  public void close() {
-    if (flushingScanner != null) {
-      flushingScanner.close();
-      flushingScanner = null;
-    }
-    for (ImmutableSegment s : segments) {
-      s.close();
-    }
-  }
-
-  /**
-   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
-   * otherwise the given cell is returned
-   * @return either the given cell or its clone
-   */
-  public Cell maybeCloneWithAllocator(Cell cell) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  public boolean shouldSeek(Scan scan, long oldestUnexpiredTS){
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  public long getMinTimestamp(){
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  /**
-   * Creates the scanner for the given read point
-   * @return a scanner for the given read point
-   */
-  public KeyValueScanner getScanner(long readPoint) {
-    KeyValueScanner resultScanner;
-    List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
-    for (ImmutableSegment s : segments) {
-      list.add(s.getScanner(readPoint));
-    }
-
-    try {
-      resultScanner = new MemStoreScanner(getComparator(), list);
-    } catch (IOException ie) {
-      throw new IllegalStateException(ie);
-    }
-
-    return resultScanner;
-  }
-
-  /**
-   * Creates the scanner for the given read point, and a specific order in a list
-   * @return a scanner for the given read point
-   */
-  public KeyValueScanner getScanner(long readPoint, long order) {
-    KeyValueScanner resultScanner;
-    List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
-    for (ImmutableSegment s : segments) {
-      list.add(s.getScanner(readPoint,order));
-    }
-
-    try {
-      resultScanner = new MemStoreScanner(getComparator(), list);
-    } catch (IOException ie) {
-      throw new IllegalStateException(ie);
-    }
-
-    return resultScanner;
-  }
-
-  public boolean isTagsPresent() {
-    for (ImmutableSegment s : segments) {
-      if (s.isTagsPresent()) return true;
-    }
-    return false;
-  }
-
-  public void incScannerCount() {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  public void decScannerCount() {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  /**
-   * Setting the CellSet of the segment - used only for flat immutable segment for setting
-   * immutable CellSet after its creation in immutable segment constructor
-   * @return this object
-   */
-
-  protected CompositeImmutableSegment setCellSet(CellSet cellSetOld, CellSet cellSetNew) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  /**
-   * @return Sum of all cell's size.
-   */
-  public long keySize() {
-    return this.keySize;
-  }
-
-  /**
-   * @return The heap overhead of this segment.
-   */
-  public long heapOverhead() {
-    long result = 0;
-    for (ImmutableSegment s : segments) {
-      result += s.heapOverhead();
-    }
-    return result;
-  }
-
-  /**
-   * Updates the heap size counter of the segment by the given delta
-   */
-  protected void incSize(long delta, long heapOverhead) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  protected void incHeapOverheadSize(long delta) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  public long getMinSequenceId() {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  public TimeRangeTracker getTimeRangeTracker() {
-    return this.timeRangeTracker;
-  }
-
-  //*** Methods for SegmentsScanner
-  public Cell last() {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  public Iterator<Cell> iterator() {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  public int compare(Cell left, Cell right) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  public int compareRows(Cell left, Cell right) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  /**
-   * @return a set of all cells in the segment
-   */
-  protected CellSet getCellSet() {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  /**
-   * Returns the Cell comparator used by this segment
-   * @return the Cell comparator used by this segment
-   */
-  protected CellComparator getComparator() {
-    return comparator;
-  }
-
-  protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
-      MemstoreSize memstoreSize) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  protected long heapOverheadChange(Cell cell, boolean succ) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  /**
-   * Returns a subset of the segment cell set, which starts with the given cell
-   * @param firstCell a cell in the segment
-   * @return a subset of the segment cell set, which starts with the given cell
-   */
-  protected SortedSet<Cell> tailSet(Cell firstCell) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  // Debug methods
-  /**
-   * Dumps all cells of the segment into the given log
-   */
-  void dump(Log log) {
-    for (ImmutableSegment s : segments) {
-      s.dump(log);
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb =
-        new StringBuilder("This is CompositeImmutableSegment and those are its segments:: ");
-    for (ImmutableSegment s : segments) {
-      sb.append(s.toString());
-    }
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 76442e1..d4e6e12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -128,20 +127,30 @@ public class DefaultMemStore extends AbstractMemStore {
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
     List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(2);
     list.add(this.active.getScanner(readPt, 1));
-    list.addAll(this.snapshot.getScanners(readPt, 0));
-    return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
+    list.add(this.snapshot.getScanner(readPt, 0));
+    return Collections.<KeyValueScanner> singletonList(
+      new MemStoreScanner(getComparator(), list));
   }
 
-  // the getSegments() method is used for tests only
-  @VisibleForTesting
   @Override
-  protected List<Segment> getSegments() {
+  protected List<Segment> getSegments() throws IOException {
     List<Segment> list = new ArrayList<Segment>(2);
     list.add(this.active);
-    list.addAll(this.snapshot.getAllSegments());
+    list.add(this.snapshot);
     return list;
   }
 
+  /**
+   * @param cell Find the row that comes after this one.  If null, we return the
+   * first.
+   * @return Next row or null if none found.
+   */
+  Cell getNextRow(final Cell cell) {
+    return getLowest(
+        getNextRow(cell, this.active.getCellSet()),
+        getNextRow(cell, this.snapshot.getCellSet()));
+  }
+
   @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b664a4a..e11a31c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
 import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
 
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -6484,8 +6483,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         final Configuration conf, final HTableDescriptor hTableDescriptor,
         final WAL wal, final boolean initialize)
   throws IOException {
-    LOG.info("creating HRegion " + info.getTable().getNameAsString() + " HTD == " + hTableDescriptor
-        + " RootDir = " + rootDir +
+    LOG.info("creating HRegion " + info.getTable().getNameAsString()
+        + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
         " Table name == " + info.getTable().getNameAsString());
     FileSystem fs = FileSystem.get(conf);
     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 547d332..4cdb29d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -30,10 +30,6 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
 
 /**
  * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
@@ -73,14 +69,6 @@ public class ImmutableSegment extends Segment {
 
   /////////////////////  CONSTRUCTORS  /////////////////////
   /**------------------------------------------------------------------------
-   * Empty C-tor to be used only for CompositeImmutableSegment
-   */
-  protected ImmutableSegment(CellComparator comparator) {
-    super(comparator);
-    this.timeRange = null;
-  }
-
-  /**------------------------------------------------------------------------
    * Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one.
    * This C-tor should be used when active MutableSegment is pushed into the compaction
    * pipeline and becomes an ImmutableSegment.
@@ -154,15 +142,6 @@ public class ImmutableSegment extends Segment {
     return this.timeRange.getMin();
   }
 
-  public long getNumOfSegments() {
-    return 1;
-  }
-
-  public List<Segment> getAllSegments() {
-    List<Segment> res = new ArrayList<Segment>(Arrays.asList(this));
-    return res;
-  }
-
   /**------------------------------------------------------------------------
    * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
    * based on CellArrayMap.
@@ -253,7 +232,7 @@ public class ImmutableSegment extends Segment {
     Cell curCell;
     int idx = 0;
     // create this segment scanner with maximal possible read point, to go over all Cells
-    KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
+    SegmentScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
 
     try {
       while ((curCell = segmentScanner.next()) != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 29fd78a..84f88f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -56,7 +56,7 @@ public class MemStoreCompactor {
 
   // The upper bound for the number of segments we store in the pipeline prior to merging.
   // This constant is subject to further experimentation.
-  private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity
+  private static final int THRESHOLD_PIPELINE_SEGMENTS = 1;
 
   private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
 
@@ -276,8 +276,6 @@ public class MemStoreCompactor {
     case NONE: action = Action.NOOP;
       break;
     case BASIC: action = Action.MERGE;
-      // if multiple segments appear in the pipeline flush them to the disk later together
-      compactingMemStore.setCompositeSnapshot(true);
       break;
     case EAGER: action = Action.COMPACT;
       break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
index fa7c342..77cea51 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
@@ -25,32 +25,19 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public class MemstoreSize {
 
+  static final MemstoreSize EMPTY_SIZE = new MemstoreSize();
+
   private long dataSize;
   private long heapOverhead;
-  final private boolean isEmpty;
-
-  static final MemstoreSize EMPTY_SIZE = new MemstoreSize(true);
 
   public MemstoreSize() {
     dataSize = 0;
     heapOverhead = 0;
-    isEmpty = false;
-  }
-
-  public MemstoreSize(boolean isEmpty) {
-    dataSize = 0;
-    heapOverhead = 0;
-    this.isEmpty = isEmpty;
-  }
-
-  public boolean isEmpty() {
-    return isEmpty;
   }
 
   public MemstoreSize(long dataSize, long heapOverhead) {
     this.dataSize = dataSize;
     this.heapOverhead = heapOverhead;
-    this.isEmpty = false;
   }
 
   public void incMemstoreSize(long dataSize, long heapOverhead) {
@@ -74,13 +61,11 @@ public class MemstoreSize {
   }
 
   public long getDataSize() {
-
-    return isEmpty ? 0 : dataSize;
+    return dataSize;
   }
 
   public long getHeapOverhead() {
-
-    return isEmpty ? 0 : heapOverhead;
+    return heapOverhead;
   }
 
   @Override
@@ -89,7 +74,7 @@ public class MemstoreSize {
       return false;
     }
     MemstoreSize other = (MemstoreSize) obj;
-    return getDataSize() == other.dataSize && getHeapOverhead() == other.heapOverhead;
+    return this.dataSize == other.dataSize && this.heapOverhead == other.heapOverhead;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index 8581517..afdfe6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -18,9 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -66,15 +64,6 @@ public abstract class Segment {
   protected final TimeRangeTracker timeRangeTracker;
   protected volatile boolean tagsPresent;
 
-  // Empty constructor to be used when Segment is used as interface,
-  // and there is no need in true Segments state
-  protected Segment(CellComparator comparator) {
-    this.comparator = comparator;
-    this.dataSize = new AtomicLong(0);
-    this.heapOverhead = new AtomicLong(0);
-    this.timeRangeTracker = new TimeRangeTracker();
-  }
-
   // This constructor is used to create empty Segments.
   protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
     this.cellSet.set(cellSet);
@@ -102,7 +91,7 @@ public abstract class Segment {
    * Creates the scanner for the given read point
    * @return a scanner for the given read point
    */
-  public KeyValueScanner getScanner(long readPoint) {
+  public SegmentScanner getScanner(long readPoint) {
     return new SegmentScanner(this, readPoint);
   }
 
@@ -110,16 +99,10 @@ public abstract class Segment {
    * Creates the scanner for the given read point, and a specific order in a list
    * @return a scanner for the given read point
    */
-  public KeyValueScanner getScanner(long readPoint, long order) {
+  public SegmentScanner getScanner(long readPoint, long order) {
     return new SegmentScanner(this, readPoint, order);
   }
 
-  public List<KeyValueScanner> getScanners(long readPoint, long order) {
-    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
-    scanners.add(getScanner(readPoint, order));
-    return scanners;
-  }
-
   /**
    * @return whether the segment has any cells
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index 7e53026..01e07ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -47,13 +47,6 @@ public final class SegmentFactory {
     return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
   }
 
-  // create composite immutable segment from a list of segments
-  public CompositeImmutableSegment createCompositeImmutableSegment(
-      final CellComparator comparator, List<ImmutableSegment> segments) {
-    return new CompositeImmutableSegment(comparator, segments);
-
-  }
-
   // create new flat immutable segment from compacting old immutable segments
   public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
       final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
@@ -109,9 +102,6 @@ public final class SegmentFactory {
 
   private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {
     List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>();
-    if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
-      return null;
-    }
     for (ImmutableSegment segment : segments) {
       mslabs.add(segment.getMemStoreLAB());
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 0c1880c..b0b63a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -137,7 +137,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
         CellComparator.COMPARATOR, store, regionServicesForStores,
         HColumnDescriptor.MemoryCompaction.EAGER);
-
     this.memstore.add(kv1.clone(), null);
     // As compaction is starting in the background the repetition
     // of the k1 might be removed BUT the scanners created earlier
@@ -178,9 +177,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     // Add more versions to make it a little more interesting.
     Thread.sleep(1);
     addRows(this.memstore);
-    ((CompactingMemStore)this.memstore).setCompositeSnapshot(true);
-
-
     Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY);
     assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty,
         new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0);
@@ -281,9 +277,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     this.memstore.upsert(l, 2, null);// readpoint is 2
     MemstoreSize newSize = this.memstore.size();
-    assertTrue("\n<<< The old size is " + oldSize.getDataSize() + " and the new size is "
-        + newSize.getDataSize() + "\n",
-        newSize.getDataSize() > oldSize.getDataSize());
+    assert (newSize.getDataSize() > oldSize.getDataSize());
     //The kv1 should be removed.
     assert (memstore.getActive().getCellsCount() == 2);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 93d28d5..27ed295 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -65,6 +65,8 @@ import org.junit.rules.TestName;
 import org.junit.rules.TestRule;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -178,10 +180,6 @@ public class TestDefaultMemStore {
     // Now assert can count same number even if a snapshot mid-scan.
     s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     count = 0;
-
-//    assertTrue("\n<<< The memstore scanners without snapshot are: \n" + memstorescanners
-//        + "\n",false);
-
     try {
       while (s.next(result)) {
         LOG.info(result);
@@ -209,10 +207,8 @@ public class TestDefaultMemStore {
     s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     count = 0;
     int snapshotIndex = 5;
-
     try {
       while (s.next(result)) {
-
         LOG.info(result);
         // Assert the stuff is coming out in right order.
         assertTrue(CellUtil.matchingRow(result.get(0), Bytes.toBytes(count)));
@@ -220,7 +216,6 @@ public class TestDefaultMemStore {
         assertEquals("count=" + count + ", result=" + result, rowCount, result.size());
         count++;
         if (count == snapshotIndex) {
-
           MemStoreSnapshot snapshot = this.memstore.snapshot();
           this.memstore.clearSnapshot(snapshot.getId());
           // Added more rows into kvset.  But the scanner wont see these rows.
@@ -232,8 +227,7 @@ public class TestDefaultMemStore {
     } finally {
       s.close();
     }
-    assertEquals("\n<<< The row count is " + rowCount + " and the iteration count is " + count,
-        rowCount, count);
+    assertEquals(rowCount, count);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/79e5efd3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index 332a125..133c53b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -22,7 +22,13 @@ import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -32,7 +38,6 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -50,48 +55,40 @@ public class TestWalAndCompactingMemStoreFlush {
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
-  public static final TableName TABLENAME =
-      TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1");
+  public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush",
+      "t1");
 
-  public static final byte[][] FAMILIES =
-      { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
+  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
+      Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
 
   public static final byte[] FAMILY1 = FAMILIES[0];
   public static final byte[] FAMILY2 = FAMILIES[1];
   public static final byte[] FAMILY3 = FAMILIES[2];
 
   private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
-    MemstoreSize memstrsize1 = MemstoreSize.EMPTY_SIZE;
-    assertEquals(memstrsize1.getDataSize(), 0);
-    assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
-    int i = 0;
+    int i=0;
     HTableDescriptor htd = new HTableDescriptor(TABLENAME);
     for (byte[] family : FAMILIES) {
       HColumnDescriptor hcd = new HColumnDescriptor(family);
       // even column families are going to have compacted memstore
-
       if(i%2 == 0) {
         hcd.setInMemoryCompaction(HColumnDescriptor.MemoryCompaction.valueOf(
             conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
       } else {
         hcd.setInMemoryCompaction(HColumnDescriptor.MemoryCompaction.NONE);
       }
-
       htd.addFamily(hcd);
       i++;
     }
-    assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
+
     HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
     Path path = new Path(DIR, callingMethod);
-    assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
-    HRegion result = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
-    assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
-    return result;
+    return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
   }
 
   // A helper function to create puts.
   private Put createPut(int familyNum, int putNum) {
-    byte[] qf = Bytes.toBytes("q" + familyNum);
+    byte[] qf  = Bytes.toBytes("q" + familyNum);
     byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
     byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
     Put p = new Put(row);
@@ -101,7 +98,7 @@ public class TestWalAndCompactingMemStoreFlush {
 
   // A helper function to create double puts, so something can be compacted later.
   private Put createDoublePut(int familyNum, int putNum) {
-    byte[] qf = Bytes.toBytes("q" + familyNum);
+    byte[] qf  = Bytes.toBytes("q" + familyNum);
     byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
     byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
     Put p = new Put(row);
@@ -125,21 +122,16 @@ public class TestWalAndCompactingMemStoreFlush {
     byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
     assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
     assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
-        r.getFamilyMap(family).get(qf));
+      r.getFamilyMap(family).get(qf));
     assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
-        Arrays.equals(r.getFamilyMap(family).get(qf), val));
+      Arrays.equals(r.getFamilyMap(family).get(qf), val));
   }
 
-  @Before public void setUp() throws Exception {
-    assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
-  }
-
-  // test selective flush with data-compaction
   @Test(timeout = 180000)
   public void testSelectiveFlushWithEager() throws IOException {
+
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
-
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
         FlushNonSloppyStoresFirstPolicy.class.getName());
@@ -183,14 +175,17 @@ public class TestWalAndCompactingMemStoreFlush {
     MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
 
     // Get the overall smallest LSN in the region's memstores.
-    long smallestSeqInRegionCurrentMemstorePhaseI =
-        getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
     String s = "\n\n----------------------------------\n"
-        + "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
-        + region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
-        + region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI
-        + ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemstore() + "\n";
+        + "Upon initial insert and before any flush, size of CF1 is:"
+        + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
+        + region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:"
+        + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
+        + region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:"
+        + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:"
+        + region.getStore(FAMILY3).isSloppyMemstore() + "\n";
 
     // The overall smallest LSN in the region's memstores should be the same as
     // the LSN of the smallest edit in CF1
@@ -205,12 +200,12 @@ public class TestWalAndCompactingMemStoreFlush {
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
-    String msg = "totalMemstoreSize=" + totalMemstoreSize +
-        " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI +
-        " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI +
-        " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI;
-    assertEquals(msg, totalMemstoreSize,
-        cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
+    String msg = "totalMemstoreSize="+totalMemstoreSize +
+        " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
+        " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
+        " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
+    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
+        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
 
     // Flush!!!!!!!!!!!!!!!!!!!!!!
     // We have big compacting memstore CF1 and two small memstores:
@@ -230,8 +225,8 @@ public class TestWalAndCompactingMemStoreFlush {
     MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
     MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
 
-    long smallestSeqInRegionCurrentMemstorePhaseII =
-        getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
     // Find the smallest LSNs for edits wrt to each CF.
     long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
     long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
@@ -265,20 +260,16 @@ public class TestWalAndCompactingMemStoreFlush {
 
     s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII
         + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " +
-        "the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:"
-        + smallestSeqCF3PhaseII + "\n";
+        "the smallest sequence in CF2:"
+        + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n";
 
     // How much does the CF1 memstore occupy? Will be used later.
     MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore();
     long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
 
     s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII
-        + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n"
-        + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize()
-        + ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region
-        .getStore(FAMILY3).getFlushedCellsSize() + ", cf4: " + region.getStore(FAMILIES[4])
-        .getFlushedCellsSize() + "; the entire region size is: " + region.getMemstoreSize() + "\n";
-    ;
+        + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ;
+
 
     // Flush!!!!!!!!!!!!!!!!!!!!!!
     // Flush again, CF1 is flushed to disk
@@ -291,22 +282,21 @@ public class TestWalAndCompactingMemStoreFlush {
     MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
     MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore();
 
-    long smallestSeqInRegionCurrentMemstorePhaseIV =
-        getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
     long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
     long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
     long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
 
     s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:"
-        + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n" + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize()
-        + ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region
-        .getStore(FAMILY3).getFlushedCellsSize() + ", cf4: " + region.getStore(FAMILIES[4])
-        .getFlushedCellsSize() + "\n";
+        + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV
+        + "\n";
 
     s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
         + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
-        "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
-        + smallestSeqCF3PhaseIV + "\n" + "the entire region size is: " + region.getMemstoreSize() + "\n";
+        "the smallest sequence in CF2:"
+        + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV
+        + "\n";
 
     // CF1's pipeline component (inserted before first flush) should be flushed to disk
     // CF2 should be flushed to disk
@@ -331,21 +321,13 @@ public class TestWalAndCompactingMemStoreFlush {
     MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore();
     MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore();
     MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore();
-    long smallestSeqInRegionCurrentMemstorePhaseV =
-        getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
-    assertEquals(MemstoreSize.EMPTY_SIZE, cf1MemstoreSizePhaseV);
+    assertEquals(MemstoreSize.EMPTY_SIZE , cf1MemstoreSizePhaseV);
     assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseV);
     assertEquals(MemstoreSize.EMPTY_SIZE, cf3MemstoreSizePhaseV);
 
-    s = s + "----AFTER THIRD FLUSH, the entire region size is:" + region.getMemstoreSize()
-        + " (empty memstore size is " + MemstoreSize.EMPTY_SIZE
-        + "), while the sizes of each memstore are as following \ncf1: " + cf1MemstoreSizePhaseV
-        + ", cf2: " + cf2MemstoreSizePhaseV + ", cf3: " + cf3MemstoreSizePhaseV + ", cf4: " + region
-        .getStore(FAMILIES[4]).getSizeOfMemStore() + "\n" + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize()
-        + ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region.getStore(FAMILY3).getFlushedCellsSize()
-        + ", cf4: " + region.getStore(FAMILIES[4]).getFlushedCellsSize() + "\n";
-
     // What happens when we hit the memstore limit, but we are not able to find
     // any Column Family above the threshold?
     // In that case, we should flush all the CFs.
@@ -363,22 +345,24 @@ public class TestWalAndCompactingMemStoreFlush {
 
     region.flush(false);
 
-    s = s + "----AFTER FORTH FLUSH, The smallest sequence in region WAL is: "
+    s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
         + smallestSeqInRegionCurrentMemstorePhaseV
         + ". After additional inserts and last flush, the entire region size is:" + region
-        .getMemstoreSize() + "\n----------------------------------\n";
+        .getMemstoreSize()
+        + "\n----------------------------------\n";
 
     // Since we won't find any CF above the threshold, and hence no specific
     // store to flush, we should flush all the memstores
     // Also compacted memstores are flushed to disk.
-    assertEquals(s, 0, region.getMemstoreSize());
+    assertEquals(0, region.getMemstoreSize());
     System.out.println(s);
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
 
   /*------------------------------------------------------------------------------*/
   /* Check the same as above but for index-compaction type of compacting memstore */
-  @Test(timeout = 180000) public void testSelectiveFlushWithIndexCompaction() throws IOException {
+  @Test(timeout = 180000)
+  public void testSelectiveFlushWithIndexCompaction() throws IOException {
 
     /*------------------------------------------------------------------------------*/
     /* SETUP */
@@ -395,7 +379,7 @@ public class TestWalAndCompactingMemStoreFlush {
 
     // Initialize the region
     Region region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
-    assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
+
     /*------------------------------------------------------------------------------*/
     /* PHASE I - insertions */
     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
@@ -426,8 +410,8 @@ public class TestWalAndCompactingMemStoreFlush {
     MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore();
     MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
     // Get the overall smallest LSN in the region's memstores.
-    long smallestSeqInRegionCurrentMemstorePhaseI =
-        getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
     /*------------------------------------------------------------------------------*/
     /* PHASE I - validation */
@@ -443,8 +427,8 @@ public class TestWalAndCompactingMemStoreFlush {
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
-    assertEquals(totalMemstoreSizePhaseI,
-        cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
+    assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize()
+        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
 
     /*------------------------------------------------------------------------------*/
     /* PHASE I - Flush */
@@ -475,8 +459,8 @@ public class TestWalAndCompactingMemStoreFlush {
     MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore();
     MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
     MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
-    long smallestSeqInRegionCurrentMemstorePhaseII =
-        getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
     // Find the smallest LSNs for edits wrt to each CF.
     long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
     long totalMemstoreSizePhaseII = region.getMemstoreSize();
@@ -484,13 +468,13 @@ public class TestWalAndCompactingMemStoreFlush {
     /*------------------------------------------------------------------------------*/
     /* PHASE II - validation */
     // CF1 was flushed to memory, should be flattened and take less space
-    assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize());
+    assertEquals(cf1MemstoreSizePhaseII.getDataSize() , cf1MemstoreSizePhaseI.getDataSize());
     assertTrue(cf1MemstoreSizePhaseII.getHeapOverhead() < cf1MemstoreSizePhaseI.getHeapOverhead());
     // CF2 should become empty
     assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII);
     // verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
     // if compacted CF# should be at least twice less because its every key was duplicated
-    assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize());
+    assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize());
     assertTrue(
         cf3MemstoreSizePhaseI.getHeapOverhead() / 2 < cf3MemstoreSizePhaseII.getHeapOverhead());
 
@@ -500,8 +484,8 @@ public class TestWalAndCompactingMemStoreFlush {
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
     // items in CF1/2
-    assertEquals(totalMemstoreSizePhaseII,
-        cf1MemstoreSizePhaseII.getDataSize() + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
+    assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize()
+        + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
 
     /*------------------------------------------------------------------------------*/
     /*------------------------------------------------------------------------------*/
@@ -529,8 +513,8 @@ public class TestWalAndCompactingMemStoreFlush {
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
     // items in CF1/2
-    assertEquals(totalMemstoreSizePhaseIII,
-        cf1MemstoreSizePhaseIII.getDataSize() + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
+    assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize()
+        + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
 
     /*------------------------------------------------------------------------------*/
     /* PHASE III - Flush */
@@ -546,8 +530,8 @@ public class TestWalAndCompactingMemStoreFlush {
     MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore();
     MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
     MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore();
-    long smallestSeqInRegionCurrentMemstorePhaseIV =
-        getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
     long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
 
     /*------------------------------------------------------------------------------*/
@@ -577,8 +561,8 @@ public class TestWalAndCompactingMemStoreFlush {
     MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore();
     MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore();
     MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore();
-    long smallestSeqInRegionCurrentMemstorePhaseV =
-        getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+    long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
+        .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
     long totalMemstoreSizePhaseV = region.getMemstoreSize();
 
     /*------------------------------------------------------------------------------*/
@@ -633,30 +617,22 @@ public class TestWalAndCompactingMemStoreFlush {
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
 
-  // test WAL behavior together with selective flush while data-compaction
-  @Test(timeout = 180000) public void testDCwithWAL() throws IOException {
-
-    MemstoreSize checkSize = MemstoreSize.EMPTY_SIZE;
-    assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
+  @Test(timeout = 180000)
+  public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
-    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
-        FlushNonSloppyStoresFirstPolicy.class.getName());
-    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class
+        .getName());
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 *
+        1024);
     conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
     // set memstore to do data compaction and not to use the speculative scan
     conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(HColumnDescriptor.MemoryCompaction.EAGER));
 
-    MemstoreSize memstrsize1 = MemstoreSize.EMPTY_SIZE;
-    assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
     // Intialize the HRegion
     HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
-
-    MemstoreSize cf2MemstoreSizePhase0 = region.getStore(FAMILY2).getSizeOfMemStore();
-    MemstoreSize cf1MemstoreSizePhase0 = region.getStore(FAMILY1).getSizeOfMemStore();
-
     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
     for (int i = 1; i <= 1200; i++) {
       region.put(createPut(1, i));
@@ -676,7 +652,6 @@ public class TestWalAndCompactingMemStoreFlush {
 
     // Find the sizes of the memstores of each CF.
     MemstoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getSizeOfMemStore();
-    //boolean oldCF2 = region.getStore(FAMILY2).isSloppyMemstore();
     MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore();
     MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
 
@@ -687,20 +662,16 @@ public class TestWalAndCompactingMemStoreFlush {
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
-    String msg = "\n<<< totalMemstoreSize=" + totalMemstoreSize +
-        " DefaultMemStore.DEEP_OVERHEAD=" + DefaultMemStore.DEEP_OVERHEAD +
-        " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI +
-        " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI +
-        " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI;
-    assertEquals(msg, totalMemstoreSize,
-        cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize()
-            + cf3MemstoreSizePhaseI.getDataSize());
+    String msg = "totalMemstoreSize="+totalMemstoreSize +
+        " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
+        " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
+        " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
+        " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
+    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
+        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
 
     // Flush!
     CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
-    MemStore cms2 = ((HStore) region.getStore(FAMILY2)).memstore;
-    MemstoreSize memstrsize2 = cms2.getSnapshotSize();
-    MemstoreSize flshsize2 = cms2.getFlushableSize();
     CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
     cms1.flushInMemory();
     cms3.flushInMemory();
@@ -713,22 +684,15 @@ public class TestWalAndCompactingMemStoreFlush {
     long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
     long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
     long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
-    MemstoreSize newSize = new MemstoreSize();
 
     // CF2 should have been cleared
-    assertEquals(
-        msg + "\n<<< CF2 is compacting " + ((HStore) region.getStore(FAMILY2)).memstore.isSloppy()
-            + ", snapshot and flushable size BEFORE flush " + memstrsize2 + "; " + flshsize2
-            + ", snapshot and flushable size AFTER flush " + cms2.getSnapshotSize() + "; " + cms2
-            .getFlushableSize() + "\n<<< cf2 size " + cms2.size() + "; the checked size "
-            + cf2MemstoreSizePhaseII + "; memstore empty size " + MemstoreSize.EMPTY_SIZE
-            + "; check size " + checkSize + "\n<<< first first first CF2 size "
-            + cf2MemstoreSizePhase0 + "; first first first CF1 size " + cf1MemstoreSizePhase0
-            + "; new new new size " + newSize + "\n", MemstoreSize.EMPTY_SIZE,
-        cf2MemstoreSizePhaseII);
-
-    String s = "\n\n----------------------------------\n" + "Upon initial insert and flush, LSN of CF1 is:"
-        + smallestSeqCF1PhaseII + ". LSN of CF2 is:" + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:"
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII);
+
+    String s = "\n\n----------------------------------\n"
+        + "Upon initial insert and flush, LSN of CF1 is:"
+        + smallestSeqCF1PhaseII + ". LSN of CF2 is:"
+        + smallestSeqCF2PhaseII + ". LSN of CF3 is:"
+        + smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:"
         + smallestSeqInRegionCurrentMemstorePhaseII + "\n";
 
     // Add same entries to compact them later
@@ -754,8 +718,8 @@ public class TestWalAndCompactingMemStoreFlush {
 
     s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII
         + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " +
-        "the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:"
-        + smallestSeqCF3PhaseIII + "\n";
+        "the smallest sequence in CF2:"
+        + smallestSeqCF2PhaseIII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIII + "\n";
 
     // Flush!
     cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
@@ -772,22 +736,20 @@ public class TestWalAndCompactingMemStoreFlush {
 
     s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
         + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
-        "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
-        + smallestSeqCF3PhaseIV + "\n";
+        "the smallest sequence in CF2:"
+        + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV + "\n";
 
     // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction
-    assertTrue(s,
-        smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII);
+    assertTrue(s, smallestSeqInRegionCurrentMemstorePhaseIV >
+        smallestSeqInRegionCurrentMemstorePhaseIII);
     assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
     assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
 
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
 
-  // test WAL behavior together with selective flush while index-compaction
   @Test(timeout = 180000)
-  public void tstICwithWAL() throws IOException {
-
+  public void testSelectiveFlushAndWALinIndexCompaction() throws IOException {
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);