You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/01/12 14:36:06 UTC
hbase git commit: HBASE-17434: New synchronization scheme for
compaction pipeline
Repository: hbase
Updated Branches:
refs/heads/master f7d0f15c9 -> 2f8ddf6fc
HBASE-17434: New synchronization scheme for compaction pipeline
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2f8ddf6f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2f8ddf6f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2f8ddf6f
Branch: refs/heads/master
Commit: 2f8ddf6fc5f904f0273b07469286e01aa02c7da5
Parents: f7d0f15
Author: eshcar <es...@yahoo-inc.com>
Authored: Sun Jan 8 22:30:44 2017 +0200
Committer: Michael Stack <st...@apache.org>
Committed: Thu Jan 12 06:35:58 2017 -0800
----------------------------------------------------------------------
.../hbase/regionserver/CompactingMemStore.java | 6 +-
.../hbase/regionserver/CompactionPipeline.java | 78 ++++++++++++--------
.../apache/hadoop/hbase/io/TestHeapSize.java | 2 +
3 files changed, 53 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2f8ddf6f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index e1289f8..99c1685 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore {
@VisibleForTesting
@Override
protected List<Segment> getSegments() {
- List<Segment> pipelineList = pipeline.getSegments();
- List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
+ List<? extends Segment> pipelineList = pipeline.getSegments();
+ List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
list.add(this.active);
list.addAll(pipelineList);
list.add(this.snapshot);
@@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore {
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
- List<Segment> pipelineList = pipeline.getSegments();
+ List<? extends Segment> pipelineList = pipeline.getSegments();
long order = pipelineList.size();
// The list of elements in pipeline + the active element + the snapshot segment
// TODO : This will change when the snapshot is made of more than one element
http://git-wip-us.apache.org/repos/asf/hbase/blob/2f8ddf6f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 9d5df77..fafdbee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -25,50 +25,65 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
/**
* The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
- * It supports pushing a segment at the head of the pipeline and pulling a segment from the
- * tail to flush to disk.
- * It also supports swap operation to allow the compactor swap a subset of the segments with a new
- * (compacted) one. This swap succeeds only if the version number passed with the list of segments
- * to swap is the same as the current version of the pipeline.
- * The pipeline version is updated whenever swapping segments or pulling the segment at the tail.
+ * It supports pushing a segment at the head of the pipeline and removing a segment from the
+ * tail when it is flushed to disk.
+ * It also supports swap method to allow the in-memory compaction swap a subset of the segments
+ * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version
+ * number passed with the list of segments to swap is the same as the current version of the
+ * pipeline.
+ * Essentially, there are two methods which can change the structure of the pipeline: pushHead()
+ * and swap(), the later is used both by a flush to disk and by an in-memory compaction.
+ * The pipeline version is updated by swap(); it allows to identify conflicting operations at the
+ * suffix of the pipeline.
+ *
+ * The synchronization model is copy-on-write. Methods which change the structure of the
+ * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make
+ * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read
+ * method accesses the read-only copy more than once it makes a local copy of it
+ * to ensure it accesses the same copy.
+ *
+ * The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also
+ * protected by a lock since they need to have a consistent (atomic) view of the pipeline list
+ * and version number.
*/
@InterfaceAudience.Private
public class CompactionPipeline {
private static final Log LOG = LogFactory.getLog(CompactionPipeline.class);
public final static long FIXED_OVERHEAD = ClassSize
- .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
- public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST;
+ .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
+ public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);
private final RegionServicesForStores region;
- private LinkedList<ImmutableSegment> pipeline;
- private long version;
+ private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
+ // The list is volatile to avoid reading a new allocated reference before the c'tor is executed
+ private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
+ // Version is volatile to ensure it is atomically read when not using a lock
+ private volatile long version = 0;
public CompactionPipeline(RegionServicesForStores region) {
this.region = region;
- this.pipeline = new LinkedList<>();
- this.version = 0;
}
public boolean pushHead(MutableSegment segment) {
ImmutableSegment immutableSegment = SegmentFactory.instance().
createImmutableSegment(segment);
synchronized (pipeline){
- return addFirst(immutableSegment);
+ boolean res = addFirst(immutableSegment);
+ readOnlyCopy = new LinkedList<>(pipeline);
+ return res;
}
}
public VersionedSegmentsList getVersionedList() {
synchronized (pipeline){
- List<ImmutableSegment> segmentList = new ArrayList<>(pipeline);
- return new VersionedSegmentsList(segmentList, version);
+ return new VersionedSegmentsList(readOnlyCopy, version);
}
}
@@ -93,8 +108,10 @@ public class CompactionPipeline {
* During index merge op this will be false and for compaction it will be true.
* @return true iff swapped tail with new segment
*/
- public boolean swap(
- VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) {
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
+ justification="Increment is done under a synchronize block so safe")
+ public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
+ boolean closeSuffix) {
if (versionedList.getVersion() != version) {
return false;
}
@@ -115,6 +132,8 @@ public class CompactionPipeline {
+ ", and the number of cells in new segment is:" + count);
}
swapSuffix(suffix, segment, closeSuffix);
+ readOnlyCopy = new LinkedList<>(pipeline);
+ version++;
}
if (closeSuffix && region != null) {
// update the global memstore size counter
@@ -193,35 +212,34 @@ public class CompactionPipeline {
}
public boolean isEmpty() {
- return pipeline.isEmpty();
+ return readOnlyCopy.isEmpty();
}
- public List<Segment> getSegments() {
- synchronized (pipeline){
- return new LinkedList<>(pipeline);
- }
+ public List<? extends Segment> getSegments() {
+ return readOnlyCopy;
}
public long size() {
- return pipeline.size();
+ return readOnlyCopy.size();
}
public long getMinSequenceId() {
long minSequenceId = Long.MAX_VALUE;
- if (!isEmpty()) {
- minSequenceId = pipeline.getLast().getMinSequenceId();
+ LinkedList<? extends Segment> localCopy = readOnlyCopy;
+ if (!localCopy.isEmpty()) {
+ minSequenceId = localCopy.getLast().getMinSequenceId();
}
return minSequenceId;
}
public MemstoreSize getTailSize() {
- if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
- return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead());
+ LinkedList<? extends Segment> localCopy = readOnlyCopy;
+ if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE;
+ return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead());
}
- private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,
+ private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
- version++;
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
http://git-wip-us.apache.org/repos/asf/hbase/blob/2f8ddf6f/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 6e8f831..ceaadbe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -325,6 +325,7 @@ public class TestHeapSize {
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
expected += ClassSize.estimateBase(CompactionPipeline.class, false);
expected += ClassSize.estimateBase(LinkedList.class, false);
+ expected += ClassSize.estimateBase(LinkedList.class, false);
expected += ClassSize.estimateBase(MemStoreCompactor.class, false);
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
if (expected != actual) {
@@ -333,6 +334,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicBoolean.class, true);
ClassSize.estimateBase(CompactionPipeline.class, true);
ClassSize.estimateBase(LinkedList.class, true);
+ ClassSize.estimateBase(LinkedList.class, true);
ClassSize.estimateBase(MemStoreCompactor.class, true);
ClassSize.estimateBase(AtomicBoolean.class, true);
assertEquals(expected, actual);