You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/01/23 23:01:45 UTC
[12/50] [abbrv] hbase git commit: HBASE-17081 Flush the entire
CompactingMemStore content to disk - recommit (Anastasia)
HBASE-17081 Flush the entire CompactingMemStore content to disk - recommit
(Anastasia)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b779143f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b779143f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b779143f
Branch: refs/heads/HBASE-16961
Commit: b779143fdcfb1ae3bfe04f2434d6ca3d5f11b587
Parents: 805d39f
Author: Ramkrishna <ra...@intel.com>
Authored: Wed Jan 18 14:40:47 2017 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Wed Jan 18 14:40:47 2017 +0530
----------------------------------------------------------------------
.../hbase/regionserver/CompactingMemStore.java | 64 +++-
.../hbase/regionserver/CompactionPipeline.java | 12 +
.../regionserver/CompositeImmutableSegment.java | 306 +++++++++++++++++++
.../hbase/regionserver/ImmutableSegment.java | 22 +-
.../hbase/regionserver/MemStoreCompactor.java | 2 +-
.../hadoop/hbase/regionserver/MemstoreSize.java | 25 +-
.../hadoop/hbase/regionserver/Segment.java | 21 +-
.../hbase/regionserver/SegmentFactory.java | 10 +
.../TestWalAndCompactingMemStoreFlush.java | 11 +-
9 files changed, 452 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 99c1685..ed7d274 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -72,6 +72,7 @@ public class CompactingMemStore extends AbstractMemStore {
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
@VisibleForTesting
private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
+ private boolean compositeSnapshot = true;
public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD
+ 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
@@ -160,7 +161,12 @@ public class CompactingMemStore extends AbstractMemStore {
stopCompaction();
pushActiveToPipeline(this.active);
snapshotId = EnvironmentEdgeManager.currentTime();
- pushTailToSnapshot();
+ // in both cases whatever is pushed to snapshot is cleared from the pipeline
+ if (compositeSnapshot) {
+ pushPipelineToSnapshot();
+ } else {
+ pushTailToSnapshot();
+ }
}
return new MemStoreSnapshot(snapshotId, this.snapshot);
}
@@ -173,8 +179,13 @@ public class CompactingMemStore extends AbstractMemStore {
public MemstoreSize getFlushableSize() {
MemstoreSize snapshotSize = getSnapshotSize();
if (snapshotSize.getDataSize() == 0) {
- // if snapshot is empty the tail of the pipeline is flushed
- snapshotSize = pipeline.getTailSize();
+ // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
+ if (compositeSnapshot) {
+ snapshotSize = pipeline.getPipelineSize();
+ snapshotSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead());
+ } else {
+ snapshotSize = pipeline.getTailSize();
+ }
}
return snapshotSize.getDataSize() > 0 ? snapshotSize
: new MemstoreSize(this.active.keySize(), this.active.heapOverhead());
@@ -221,10 +232,20 @@ public class CompactingMemStore extends AbstractMemStore {
List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
list.add(this.active);
list.addAll(pipelineList);
- list.add(this.snapshot);
+ list.addAll(this.snapshot.getAllSegments());
+
return list;
}
+ // the following three methods allow to manipulate the settings of composite snapshot
+ public void setCompositeSnapshot(boolean useCompositeSnapshot) {
+ this.compositeSnapshot = useCompositeSnapshot;
+ }
+
+ public boolean isCompositeSnapshot() {
+ return this.compositeSnapshot;
+ }
+
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) {
return pipeline.swap(versionedList, result, !merge);
@@ -265,17 +286,20 @@ public class CompactingMemStore extends AbstractMemStore {
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<? extends Segment> pipelineList = pipeline.getSegments();
- long order = pipelineList.size();
+ int order = pipelineList.size() + snapshot.getNumOfSegments();
// The list of elements in pipeline + the active element + the snapshot segment
// TODO : This will change when the snapshot is made of more than one element
// The order is the Segment ordinal
- List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
+ List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(order+1);
list.add(this.active.getScanner(readPt, order + 1));
for (Segment item : pipelineList) {
list.add(item.getScanner(readPt, order));
order--;
}
- list.add(this.snapshot.getScanner(readPt, order));
+ for (Segment item : snapshot.getAllSegments()) {
+ list.add(item.getScanner(readPt, order));
+ order--;
+ }
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
}
@@ -382,13 +406,37 @@ public class CompactingMemStore extends AbstractMemStore {
pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
}
+ private void pushPipelineToSnapshot() {
+ int iterationsCnt = 0;
+ boolean done = false;
+ while (!done) {
+ iterationsCnt++;
+ VersionedSegmentsList segments = pipeline.getVersionedList();
+ pushToSnapshot(segments.getStoreSegments());
+ // swap can return false in case the pipeline was updated by ongoing compaction
+ // and the version increase, the chance of it happenning is very low
+ done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now
+ if (iterationsCnt>2) {
+ // practically it is impossible that this loop iterates more than two times
+ // (because the compaction is stopped and none restarts it while in snapshot request),
+ // however stopping here for the case of the infinite loop causing by any error
+ LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," +
+ " while flushing to disk.");
+ this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator());
+ break;
+ }
+ }
+ }
+
private void pushToSnapshot(List<ImmutableSegment> segments) {
if(segments.isEmpty()) return;
if(segments.size() == 1 && !segments.get(0).isEmpty()) {
this.snapshot = segments.get(0);
return;
+ } else { // create composite snapshot
+ this.snapshot =
+ SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments);
}
- // TODO else craete composite snapshot
}
private RegionServicesForStores getRegionServices() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index fafdbee..e533bd0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -238,6 +238,18 @@ public class CompactionPipeline {
return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead());
}
+ public MemstoreSize getPipelineSize() {
+ long keySize = 0;
+ long heapOverhead = 0;
+ LinkedList<? extends Segment> localCopy = readOnlyCopy;
+ if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE;
+ for (Segment segment : localCopy) {
+ keySize += segment.keySize();
+ heapOverhead += segment.heapOverhead();
+ }
+ return new MemstoreSize(keySize, heapOverhead);
+ }
+
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
new file mode 100644
index 0000000..30d17fb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
@@ -0,0 +1,306 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.SortedSet;
+
+/**
+ * The CompositeImmutableSegments is created as a collection of ImmutableSegments and supports
+ * the interface of a single ImmutableSegments.
+ * The CompositeImmutableSegments is planned to be used only as a snapshot,
+ * thus only relevant interfaces are supported
+ */
+@InterfaceAudience.Private
+public class CompositeImmutableSegment extends ImmutableSegment {
+
+ private final List<ImmutableSegment> segments;
+ private final CellComparator comparator;
+ // CompositeImmutableSegment is used for snapshots and snapshot should
+ // support getTimeRangeTracker() interface.
+ // Thus we hold a constant TRT build in the construction time from TRT of the given segments.
+ private final TimeRangeTracker timeRangeTracker;
+
+ private long keySize = 0;
+
+ public CompositeImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) {
+ super(comparator);
+ this.comparator = comparator;
+ this.segments = segments;
+ this.timeRangeTracker = new TimeRangeTracker();
+ for (ImmutableSegment s : segments) {
+ this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
+ this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());
+ this.keySize += s.keySize();
+ }
+ }
+
+ @VisibleForTesting
+ public List<Segment> getAllSegments() {
+ return new LinkedList<Segment>(segments);
+ }
+
+ public int getNumOfSegments() {
+ return segments.size();
+ }
+
+ /**
+ * Builds a special scanner for the MemStoreSnapshot object that is different than the
+ * general segment scanner.
+ * @return a special scanner for the MemStoreSnapshot object
+ */
+ public KeyValueScanner getSnapshotScanner() {
+ return getScanner(Long.MAX_VALUE, Long.MAX_VALUE);
+ }
+
+ /**
+ * @return whether the segment has any cells
+ */
+ public boolean isEmpty() {
+ for (ImmutableSegment s : segments) {
+ if (!s.isEmpty()) return false;
+ }
+ return true;
+ }
+
+ /**
+ * @return number of cells in segment
+ */
+ public int getCellsCount() {
+ int result = 0;
+ for (ImmutableSegment s : segments) {
+ result += s.getCellsCount();
+ }
+ return result;
+ }
+
+ /**
+ * @return the first cell in the segment that has equal or greater key than the given cell
+ */
+ public Cell getFirstAfter(Cell cell) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ /**
+ * Closing a segment before it is being discarded
+ */
+ public void close() {
+ for (ImmutableSegment s : segments) {
+ s.close();
+ }
+ }
+
+ /**
+ * If the segment has a memory allocator the cell is being cloned to this space, and returned;
+ * otherwise the given cell is returned
+ * @return either the given cell or its clone
+ */
+ public Cell maybeCloneWithAllocator(Cell cell) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ public boolean shouldSeek(Scan scan, long oldestUnexpiredTS){
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ public long getMinTimestamp(){
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ /**
+ * Creates the scanner for the given read point
+ * @return a scanner for the given read point
+ */
+ public KeyValueScanner getScanner(long readPoint) {
+ // Long.MAX_VALUE is DEFAULT_SCANNER_ORDER
+ return getScanner(readPoint,Long.MAX_VALUE);
+ }
+
+ /**
+ * Creates the scanner for the given read point, and a specific order in a list
+ * @return a scanner for the given read point
+ */
+ public KeyValueScanner getScanner(long readPoint, long order) {
+ KeyValueScanner resultScanner;
+ List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
+ for (ImmutableSegment s : segments) {
+ list.add(s.getScanner(readPoint, order));
+ }
+
+ try {
+ resultScanner = new MemStoreScanner(getComparator(), list);
+ } catch (IOException ie) {
+ throw new IllegalStateException(ie);
+ }
+
+ return resultScanner;
+ }
+
+ public boolean isTagsPresent() {
+ for (ImmutableSegment s : segments) {
+ if (s.isTagsPresent()) return true;
+ }
+ return false;
+ }
+
+ public void incScannerCount() {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ public void decScannerCount() {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ /**
+ * Setting the CellSet of the segment - used only for flat immutable segment for setting
+ * immutable CellSet after its creation in immutable segment constructor
+ * @return this object
+ */
+
+ protected CompositeImmutableSegment setCellSet(CellSet cellSetOld, CellSet cellSetNew) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ /**
+ * @return Sum of all cell sizes.
+ */
+ public long keySize() {
+ return this.keySize;
+ }
+
+ /**
+ * @return The heap overhead of this segment.
+ */
+ public long heapOverhead() {
+ long result = 0;
+ for (ImmutableSegment s : segments) {
+ result += s.heapOverhead();
+ }
+ return result;
+ }
+
+ /**
+ * Updates the heap size counter of the segment by the given delta
+ */
+ protected void incSize(long delta, long heapOverhead) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ protected void incHeapOverheadSize(long delta) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ public long getMinSequenceId() {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ public TimeRangeTracker getTimeRangeTracker() {
+ return this.timeRangeTracker;
+ }
+
+ //*** Methods for SegmentsScanner
+ public Cell last() {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ public Iterator<Cell> iterator() {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ public int compare(Cell left, Cell right) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ public int compareRows(Cell left, Cell right) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ /**
+ * @return a set of all cells in the segment
+ */
+ protected CellSet getCellSet() {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ /**
+ * Returns the Cell comparator used by this segment
+ * @return the Cell comparator used by this segment
+ */
+ protected CellComparator getComparator() {
+ return comparator;
+ }
+
+ protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
+ MemstoreSize memstoreSize) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ protected long heapOverheadChange(Cell cell, boolean succ) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ /**
+ * Returns a subset of the segment cell set, which starts with the given cell
+ * @param firstCell a cell in the segment
+ * @return a subset of the segment cell set, which starts with the given cell
+ */
+ protected SortedSet<Cell> tailSet(Cell firstCell) {
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
+
+ // Debug methods
+ /**
+ * Dumps all cells of the segment into the given log
+ */
+ void dump(Log log) {
+ for (ImmutableSegment s : segments) {
+ s.dump(log);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb =
+ new StringBuilder("This is CompositeImmutableSegment and those are its segments:: ");
+ for (ImmutableSegment s : segments) {
+ sb.append(s.toString());
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 0fae6c3..faa9b67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.TimeRange;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
/**
* ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
@@ -68,6 +71,14 @@ public class ImmutableSegment extends Segment {
///////////////////// CONSTRUCTORS /////////////////////
/**------------------------------------------------------------------------
+ * Empty C-tor to be used only for CompositeImmutableSegment
+ */
+ protected ImmutableSegment(CellComparator comparator) {
+ super(comparator);
+ this.timeRange = null;
+ }
+
+ /**------------------------------------------------------------------------
* Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one.
* This C-tor should be used when active MutableSegment is pushed into the compaction
* pipeline and becomes an ImmutableSegment.
@@ -141,6 +152,15 @@ public class ImmutableSegment extends Segment {
return this.timeRange.getMin();
}
+ public int getNumOfSegments() {
+ return 1;
+ }
+
+ public List<Segment> getAllSegments() {
+ List<Segment> res = new ArrayList<Segment>(Arrays.asList(this));
+ return res;
+ }
+
/**------------------------------------------------------------------------
* Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
* based on CellArrayMap.
@@ -231,7 +251,7 @@ public class ImmutableSegment extends Segment {
Cell curCell;
int idx = 0;
// create this segment scanner with maximal possible read point, to go over all Cells
- SegmentScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
+ KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
try {
while ((curCell = segmentScanner.next()) != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 84f88f0..2174d89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -56,7 +56,7 @@ public class MemStoreCompactor {
// The upper bound for the number of segments we store in the pipeline prior to merging.
// This constant is subject to further experimentation.
- private static final int THRESHOLD_PIPELINE_SEGMENTS = 1;
+ private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
index 77cea51..fa7c342 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
@@ -25,19 +25,32 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public class MemstoreSize {
- static final MemstoreSize EMPTY_SIZE = new MemstoreSize();
-
private long dataSize;
private long heapOverhead;
+ final private boolean isEmpty;
+
+ static final MemstoreSize EMPTY_SIZE = new MemstoreSize(true);
public MemstoreSize() {
dataSize = 0;
heapOverhead = 0;
+ isEmpty = false;
+ }
+
+ public MemstoreSize(boolean isEmpty) {
+ dataSize = 0;
+ heapOverhead = 0;
+ this.isEmpty = isEmpty;
+ }
+
+ public boolean isEmpty() {
+ return isEmpty;
}
public MemstoreSize(long dataSize, long heapOverhead) {
this.dataSize = dataSize;
this.heapOverhead = heapOverhead;
+ this.isEmpty = false;
}
public void incMemstoreSize(long dataSize, long heapOverhead) {
@@ -61,11 +74,13 @@ public class MemstoreSize {
}
public long getDataSize() {
- return dataSize;
+
+ return isEmpty ? 0 : dataSize;
}
public long getHeapOverhead() {
- return heapOverhead;
+
+ return isEmpty ? 0 : heapOverhead;
}
@Override
@@ -74,7 +89,7 @@ public class MemstoreSize {
return false;
}
MemstoreSize other = (MemstoreSize) obj;
- return this.dataSize == other.dataSize && this.heapOverhead == other.heapOverhead;
+ return getDataSize() == other.dataSize && getHeapOverhead() == other.heapOverhead;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index afdfe6f..8581517 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -18,7 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -64,6 +66,15 @@ public abstract class Segment {
protected final TimeRangeTracker timeRangeTracker;
protected volatile boolean tagsPresent;
+ // Empty constructor to be used when Segment is used as interface,
+ // and there is no need in true Segments state
+ protected Segment(CellComparator comparator) {
+ this.comparator = comparator;
+ this.dataSize = new AtomicLong(0);
+ this.heapOverhead = new AtomicLong(0);
+ this.timeRangeTracker = new TimeRangeTracker();
+ }
+
// This constructor is used to create empty Segments.
protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
this.cellSet.set(cellSet);
@@ -91,7 +102,7 @@ public abstract class Segment {
* Creates the scanner for the given read point
* @return a scanner for the given read point
*/
- public SegmentScanner getScanner(long readPoint) {
+ public KeyValueScanner getScanner(long readPoint) {
return new SegmentScanner(this, readPoint);
}
@@ -99,10 +110,16 @@ public abstract class Segment {
* Creates the scanner for the given read point, and a specific order in a list
* @return a scanner for the given read point
*/
- public SegmentScanner getScanner(long readPoint, long order) {
+ public KeyValueScanner getScanner(long readPoint, long order) {
return new SegmentScanner(this, readPoint, order);
}
+ public List<KeyValueScanner> getScanners(long readPoint, long order) {
+ List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
+ scanners.add(getScanner(readPoint, order));
+ return scanners;
+ }
+
/**
* @return whether the segment has any cells
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index 01e07ef..7e53026 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -47,6 +47,13 @@ public final class SegmentFactory {
return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
}
+ // create composite immutable segment from a list of segments
+ public CompositeImmutableSegment createCompositeImmutableSegment(
+ final CellComparator comparator, List<ImmutableSegment> segments) {
+ return new CompositeImmutableSegment(comparator, segments);
+
+ }
+
// create new flat immutable segment from compacting old immutable segments
public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
@@ -102,6 +109,9 @@ public final class SegmentFactory {
private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {
List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>();
+ if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
+ return null;
+ }
for (ImmutableSegment segment : segments) {
mslabs.add(segment.getMemStoreLAB());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b779143f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index 133c53b..8215d53 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -622,10 +622,9 @@ public class TestWalAndCompactingMemStoreFlush {
// Set up the configuration
Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
- conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class
- .getName());
- conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 *
- 1024);
+ conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+ FlushNonSloppyStoresFirstPolicy.class.getName());
+ conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
// set memstore to do data compaction and not to use the speculative scan
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
@@ -648,6 +647,10 @@ public class TestWalAndCompactingMemStoreFlush {
region.put(createPut(2, i));
}
+ // in this test check the non-composite snapshot - flashing only tail of the pipeline
+ ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false);
+ ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false);
+
long totalMemstoreSize = region.getMemstoreSize();
// Find the sizes of the memstores of each CF.