You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/01/23 23:01:45 UTC

[12/50] [abbrv] hbase git commit: HBASE-17081 Flush the entire CompactingMemStore content to disk - recommit (Anastasia)

HBASE-17081 Flush the entire CompactingMemStore content to disk - recommit
(Anastasia)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b779143f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b779143f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b779143f

Branch: refs/heads/HBASE-16961
Commit: b779143fdcfb1ae3bfe04f2434d6ca3d5f11b587
Parents: 805d39f
Author: Ramkrishna <ra...@intel.com>
Authored: Wed Jan 18 14:40:47 2017 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Wed Jan 18 14:40:47 2017 +0530

----------------------------------------------------------------------
 .../hbase/regionserver/CompactingMemStore.java  |  64 +++-
 .../hbase/regionserver/CompactionPipeline.java  |  12 +
 .../regionserver/CompositeImmutableSegment.java | 306 +++++++++++++++++++
 .../hbase/regionserver/ImmutableSegment.java    |  22 +-
 .../hbase/regionserver/MemStoreCompactor.java   |   2 +-
 .../hadoop/hbase/regionserver/MemstoreSize.java |  25 +-
 .../hadoop/hbase/regionserver/Segment.java      |  21 +-
 .../hbase/regionserver/SegmentFactory.java      |  10 +
 .../TestWalAndCompactingMemStoreFlush.java      |  11 +-
 9 files changed, 452 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 99c1685..ed7d274 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -72,6 +72,7 @@ public class CompactingMemStore extends AbstractMemStore {
   private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
   @VisibleForTesting
   private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
+  private boolean compositeSnapshot = true;
 
   public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD
       + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
@@ -160,7 +161,12 @@ public class CompactingMemStore extends AbstractMemStore {
       stopCompaction();
       pushActiveToPipeline(this.active);
       snapshotId = EnvironmentEdgeManager.currentTime();
-      pushTailToSnapshot();
+      // in both cases whatever is pushed to snapshot is cleared from the pipeline
+      if (compositeSnapshot) {
+        pushPipelineToSnapshot();
+      } else {
+        pushTailToSnapshot();
+      }
     }
     return new MemStoreSnapshot(snapshotId, this.snapshot);
   }
@@ -173,8 +179,13 @@ public class CompactingMemStore extends AbstractMemStore {
   public MemstoreSize getFlushableSize() {
     MemstoreSize snapshotSize = getSnapshotSize();
     if (snapshotSize.getDataSize() == 0) {
-      // if snapshot is empty the tail of the pipeline is flushed
-      snapshotSize = pipeline.getTailSize();
+      // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
+      if (compositeSnapshot) {
+        snapshotSize = pipeline.getPipelineSize();
+        snapshotSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead());
+      } else {
+        snapshotSize = pipeline.getTailSize();
+      }
     }
     return snapshotSize.getDataSize() > 0 ? snapshotSize
         : new MemstoreSize(this.active.keySize(), this.active.heapOverhead());
@@ -221,10 +232,20 @@ public class CompactingMemStore extends AbstractMemStore {
     List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
     list.add(this.active);
     list.addAll(pipelineList);
-    list.add(this.snapshot);
+    list.addAll(this.snapshot.getAllSegments());
+
     return list;
   }
 
+  // the following three methods allow to manipulate the settings of composite snapshot
+  public void setCompositeSnapshot(boolean useCompositeSnapshot) {
+    this.compositeSnapshot = useCompositeSnapshot;
+  }
+
+  public boolean isCompositeSnapshot() {
+    return this.compositeSnapshot;
+  }
+
   public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
       boolean merge) {
     return pipeline.swap(versionedList, result, !merge);
@@ -265,17 +286,20 @@ public class CompactingMemStore extends AbstractMemStore {
    */
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
     List<? extends Segment> pipelineList = pipeline.getSegments();
-    long order = pipelineList.size();
+    int order = pipelineList.size() + snapshot.getNumOfSegments();
     // The list of elements in pipeline + the active element + the snapshot segment
     // TODO : This will change when the snapshot is made of more than one element
     // The order is the Segment ordinal
-    List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
+    List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(order+1);
     list.add(this.active.getScanner(readPt, order + 1));
     for (Segment item : pipelineList) {
       list.add(item.getScanner(readPt, order));
       order--;
     }
-    list.add(this.snapshot.getScanner(readPt, order));
+    for (Segment item : snapshot.getAllSegments()) {
+      list.add(item.getScanner(readPt, order));
+      order--;
+    }
     return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
   }
 
@@ -382,13 +406,37 @@ public class CompactingMemStore extends AbstractMemStore {
     pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
   }
 
+  private void pushPipelineToSnapshot() {
+    int iterationsCnt = 0;
+    boolean done = false;
+    while (!done) {
+      iterationsCnt++;
+      VersionedSegmentsList segments = pipeline.getVersionedList();
+      pushToSnapshot(segments.getStoreSegments());
+      // swap can return false in case the pipeline was updated by ongoing compaction
+      // and the version increase, the chance of it happenning is very low
+      done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now
+      if (iterationsCnt>2) {
+        // practically it is impossible that this loop iterates more than two times
+        // (because the compaction is stopped and none restarts it while in snapshot request),
+        // however stopping here for the case of the infinite loop causing by any error
+        LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," +
+            " while flushing to disk.");
+        this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator());
+        break;
+      }
+    }
+  }
+
   private void pushToSnapshot(List<ImmutableSegment> segments) {
     if(segments.isEmpty()) return;
     if(segments.size() == 1 && !segments.get(0).isEmpty()) {
       this.snapshot = segments.get(0);
       return;
+    } else { // create composite snapshot
+      this.snapshot =
+          SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments);
     }
-    // TODO else craete composite snapshot
   }
 
   private RegionServicesForStores getRegionServices() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index fafdbee..e533bd0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -238,6 +238,18 @@ public class CompactionPipeline {
     return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead());
   }
 
+  public MemstoreSize getPipelineSize() {
+    long keySize = 0;
+    long heapOverhead = 0;
+    LinkedList<? extends Segment> localCopy = readOnlyCopy;
+    if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE;
+    for (Segment segment : localCopy) {
+      keySize += segment.keySize();
+      heapOverhead += segment.heapOverhead();
+    }
+    return new MemstoreSize(keySize, heapOverhead);
+  }
+
   private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
       boolean closeSegmentsInSuffix) {
     // During index merge we won't be closing the segments undergoing the merge. Segment#close()

http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
new file mode 100644
index 0000000..30d17fb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
@@ -0,0 +1,306 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.SortedSet;
+
+/**
+ * The CompositeImmutableSegments is created as a collection of ImmutableSegments and supports
+ * the interface of a single ImmutableSegments.
+ * The CompositeImmutableSegments is planned to be used only as a snapshot,
+ * thus only relevant interfaces are supported
+ */
+@InterfaceAudience.Private
+public class CompositeImmutableSegment extends ImmutableSegment {
+
+  private final List<ImmutableSegment> segments;
+  private final CellComparator comparator;
+  // CompositeImmutableSegment is used for snapshots and snapshot should
+  // support getTimeRangeTracker() interface.
+  // Thus we hold a constant TRT build in the construction time from TRT of the given segments.
+  private final TimeRangeTracker timeRangeTracker;
+
+  private long keySize = 0;
+
+  public CompositeImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) {
+    super(comparator);
+    this.comparator = comparator;
+    this.segments = segments;
+    this.timeRangeTracker = new TimeRangeTracker();
+    for (ImmutableSegment s : segments) {
+      this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
+      this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());
+      this.keySize += s.keySize();
+    }
+  }
+
+  @VisibleForTesting
+  public List<Segment> getAllSegments() {
+    return new LinkedList<Segment>(segments);
+  }
+
+  public int getNumOfSegments() {
+    return segments.size();
+  }
+
+  /**
+   * Builds a special scanner for the MemStoreSnapshot object that is different than the
+   * general segment scanner.
+   * @return a special scanner for the MemStoreSnapshot object
+   */
+  public KeyValueScanner getSnapshotScanner() {
+    return getScanner(Long.MAX_VALUE, Long.MAX_VALUE);
+  }
+
+  /**
+   * @return whether the segment has any cells
+   */
+  public boolean isEmpty() {
+    for (ImmutableSegment s : segments) {
+      if (!s.isEmpty()) return false;
+    }
+    return true;
+  }
+
+  /**
+   * @return number of cells in segment
+   */
+  public int getCellsCount() {
+    int result = 0;
+    for (ImmutableSegment s : segments) {
+      result += s.getCellsCount();
+    }
+    return result;
+  }
+
+  /**
+   * @return the first cell in the segment that has equal or greater key than the given cell
+   */
+  public Cell getFirstAfter(Cell cell) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  /**
+   * Closing a segment before it is being discarded
+   */
+  public void close() {
+    for (ImmutableSegment s : segments) {
+      s.close();
+    }
+  }
+
+  /**
+   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
+   * otherwise the given cell is returned
+   * @return either the given cell or its clone
+   */
+  public Cell maybeCloneWithAllocator(Cell cell) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  public boolean shouldSeek(Scan scan, long oldestUnexpiredTS){
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  public long getMinTimestamp(){
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  /**
+   * Creates the scanner for the given read point
+   * @return a scanner for the given read point
+   */
+  public KeyValueScanner getScanner(long readPoint) {
+    // Long.MAX_VALUE is DEFAULT_SCANNER_ORDER
+    return getScanner(readPoint,Long.MAX_VALUE);
+  }
+
+  /**
+   * Creates the scanner for the given read point, and a specific order in a list
+   * @return a scanner for the given read point
+   */
+  public KeyValueScanner getScanner(long readPoint, long order) {
+    KeyValueScanner resultScanner;
+    List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
+    for (ImmutableSegment s : segments) {
+      list.add(s.getScanner(readPoint, order));
+    }
+
+    try {
+      resultScanner = new MemStoreScanner(getComparator(), list);
+    } catch (IOException ie) {
+      throw new IllegalStateException(ie);
+    }
+
+    return resultScanner;
+  }
+
+  public boolean isTagsPresent() {
+    for (ImmutableSegment s : segments) {
+      if (s.isTagsPresent()) return true;
+    }
+    return false;
+  }
+
+  public void incScannerCount() {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  public void decScannerCount() {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  /**
+   * Setting the CellSet of the segment - used only for flat immutable segment for setting
+   * immutable CellSet after its creation in immutable segment constructor
+   * @return this object
+   */
+
+  protected CompositeImmutableSegment setCellSet(CellSet cellSetOld, CellSet cellSetNew) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  /**
+   * @return Sum of all cell sizes.
+   */
+  public long keySize() {
+    return this.keySize;
+  }
+
+  /**
+   * @return The heap overhead of this segment.
+   */
+  public long heapOverhead() {
+    long result = 0;
+    for (ImmutableSegment s : segments) {
+      result += s.heapOverhead();
+    }
+    return result;
+  }
+
+  /**
+   * Updates the heap size counter of the segment by the given delta
+   */
+  protected void incSize(long delta, long heapOverhead) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  protected void incHeapOverheadSize(long delta) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  public long getMinSequenceId() {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  public TimeRangeTracker getTimeRangeTracker() {
+    return this.timeRangeTracker;
+  }
+
+  //*** Methods for SegmentsScanner
+  public Cell last() {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  public Iterator<Cell> iterator() {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  public int compare(Cell left, Cell right) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  public int compareRows(Cell left, Cell right) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  /**
+   * @return a set of all cells in the segment
+   */
+  protected CellSet getCellSet() {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  /**
+   * Returns the Cell comparator used by this segment
+   * @return the Cell comparator used by this segment
+   */
+  protected CellComparator getComparator() {
+    return comparator;
+  }
+
+  protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
+      MemstoreSize memstoreSize) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  protected long heapOverheadChange(Cell cell, boolean succ) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  /**
+   * Returns a subset of the segment cell set, which starts with the given cell
+   * @param firstCell a cell in the segment
+   * @return a subset of the segment cell set, which starts with the given cell
+   */
+  protected SortedSet<Cell> tailSet(Cell firstCell) {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  // Debug methods
+  /**
+   * Dumps all cells of the segment into the given log
+   */
+  void dump(Log log) {
+    for (ImmutableSegment s : segments) {
+      s.dump(log);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb =
+        new StringBuilder("This is CompositeImmutableSegment and those are its segments:: ");
+    for (ImmutableSegment s : segments) {
+      sb.append(s.toString());
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 0fae6c3..faa9b67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.TimeRange;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 /**
  * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
@@ -68,6 +71,14 @@ public class ImmutableSegment extends Segment {
 
   /////////////////////  CONSTRUCTORS  /////////////////////
   /**------------------------------------------------------------------------
+   * Empty C-tor to be used only for CompositeImmutableSegment
+   */
+  protected ImmutableSegment(CellComparator comparator) {
+    super(comparator);
+    this.timeRange = null;
+  }
+
+  /**------------------------------------------------------------------------
    * Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one.
    * This C-tor should be used when active MutableSegment is pushed into the compaction
    * pipeline and becomes an ImmutableSegment.
@@ -141,6 +152,15 @@ public class ImmutableSegment extends Segment {
     return this.timeRange.getMin();
   }
 
+  public int getNumOfSegments() {
+    return 1;
+  }
+
+  public List<Segment> getAllSegments() {
+    List<Segment> res = new ArrayList<Segment>(Arrays.asList(this));
+    return res;
+  }
+
   /**------------------------------------------------------------------------
    * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
    * based on CellArrayMap.
@@ -231,7 +251,7 @@ public class ImmutableSegment extends Segment {
     Cell curCell;
     int idx = 0;
     // create this segment scanner with maximal possible read point, to go over all Cells
-    SegmentScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
+    KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
 
     try {
       while ((curCell = segmentScanner.next()) != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 84f88f0..2174d89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -56,7 +56,7 @@ public class MemStoreCompactor {
 
   // The upper bound for the number of segments we store in the pipeline prior to merging.
   // This constant is subject to further experimentation.
-  private static final int THRESHOLD_PIPELINE_SEGMENTS = 1;
+  private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity
 
   private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
index 77cea51..fa7c342 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
@@ -25,19 +25,32 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public class MemstoreSize {
 
-  static final MemstoreSize EMPTY_SIZE = new MemstoreSize();
-
   private long dataSize;
   private long heapOverhead;
+  final private boolean isEmpty;
+
+  static final MemstoreSize EMPTY_SIZE = new MemstoreSize(true);
 
   public MemstoreSize() {
     dataSize = 0;
     heapOverhead = 0;
+    isEmpty = false;
+  }
+
+  public MemstoreSize(boolean isEmpty) {
+    dataSize = 0;
+    heapOverhead = 0;
+    this.isEmpty = isEmpty;
+  }
+
+  public boolean isEmpty() {
+    return isEmpty;
   }
 
   public MemstoreSize(long dataSize, long heapOverhead) {
     this.dataSize = dataSize;
     this.heapOverhead = heapOverhead;
+    this.isEmpty = false;
   }
 
   public void incMemstoreSize(long dataSize, long heapOverhead) {
@@ -61,11 +74,13 @@ public class MemstoreSize {
   }
 
   public long getDataSize() {
-    return dataSize;
+
+    return isEmpty ? 0 : dataSize;
   }
 
   public long getHeapOverhead() {
-    return heapOverhead;
+
+    return isEmpty ? 0 : heapOverhead;
   }
 
   @Override
@@ -74,7 +89,7 @@ public class MemstoreSize {
       return false;
     }
     MemstoreSize other = (MemstoreSize) obj;
-    return this.dataSize == other.dataSize && this.heapOverhead == other.heapOverhead;
+    return getDataSize() == other.dataSize && getHeapOverhead() == other.heapOverhead;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index afdfe6f..8581517 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -18,7 +18,9 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -64,6 +66,15 @@ public abstract class Segment {
   protected final TimeRangeTracker timeRangeTracker;
   protected volatile boolean tagsPresent;
 
+  // Empty constructor to be used when Segment is used as interface,
+  // and there is no need in true Segments state
+  protected Segment(CellComparator comparator) {
+    this.comparator = comparator;
+    this.dataSize = new AtomicLong(0);
+    this.heapOverhead = new AtomicLong(0);
+    this.timeRangeTracker = new TimeRangeTracker();
+  }
+
   // This constructor is used to create empty Segments.
   protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
     this.cellSet.set(cellSet);
@@ -91,7 +102,7 @@ public abstract class Segment {
    * Creates the scanner for the given read point
    * @return a scanner for the given read point
    */
-  public SegmentScanner getScanner(long readPoint) {
+  public KeyValueScanner getScanner(long readPoint) {
     return new SegmentScanner(this, readPoint);
   }
 
@@ -99,10 +110,16 @@ public abstract class Segment {
    * Creates the scanner for the given read point, and a specific order in a list
    * @return a scanner for the given read point
    */
-  public SegmentScanner getScanner(long readPoint, long order) {
+  public KeyValueScanner getScanner(long readPoint, long order) {
     return new SegmentScanner(this, readPoint, order);
   }
 
+  public List<KeyValueScanner> getScanners(long readPoint, long order) {
+    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
+    scanners.add(getScanner(readPoint, order));
+    return scanners;
+  }
+
   /**
    * @return whether the segment has any cells
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index 01e07ef..7e53026 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -47,6 +47,13 @@ public final class SegmentFactory {
     return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
   }
 
+  // create composite immutable segment from a list of segments
+  public CompositeImmutableSegment createCompositeImmutableSegment(
+      final CellComparator comparator, List<ImmutableSegment> segments) {
+    return new CompositeImmutableSegment(comparator, segments);
+
+  }
+
   // create new flat immutable segment from compacting old immutable segments
   public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
       final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
@@ -102,6 +109,9 @@ public final class SegmentFactory {
 
   private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {
     List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>();
+    if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
+      return null;
+    }
     for (ImmutableSegment segment : segments) {
       mslabs.add(segment.getMemStoreLAB());
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index 133c53b..8215d53 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -622,10 +622,9 @@ public class TestWalAndCompactingMemStoreFlush {
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
-    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class
-        .getName());
-    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 *
-        1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+        FlushNonSloppyStoresFirstPolicy.class.getName());
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
     conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
     // set memstore to do data compaction and not to use the speculative scan
     conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
@@ -648,6 +647,10 @@ public class TestWalAndCompactingMemStoreFlush {
       region.put(createPut(2, i));
     }
 
+    // in this test check the non-composite snapshot - flashing only tail of the pipeline
+    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false);
+    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false);
+
     long totalMemstoreSize = region.getMemstoreSize();
 
     // Find the sizes of the memstores of each CF.