You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/01/23 01:23:40 UTC
hbase git commit: HBASE-17407: Correct update of maxFlushedSeqId in
HRegion
Repository: hbase
Updated Branches:
refs/heads/master 3abd13dac -> f254e278e
HBASE-17407: Correct update of maxFlushedSeqId in HRegion
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f254e278
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f254e278
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f254e278
Branch: refs/heads/master
Commit: f254e278ece751e67c92570aef4b15fddab22a94
Parents: 3abd13d
Author: eshcar <es...@yahoo-inc.com>
Authored: Thu Jan 19 01:11:58 2017 +0200
Committer: zhangduo <zh...@apache.org>
Committed: Mon Jan 23 09:22:51 2017 +0800
----------------------------------------------------------------------
.../hbase/regionserver/CompactingMemStore.java | 24 ++++++++++++++++----
.../hbase/regionserver/CompactionPipeline.java | 8 +++++++
.../hbase/regionserver/DefaultMemStore.java | 4 +++-
.../hadoop/hbase/regionserver/HRegion.java | 10 ++++----
.../hadoop/hbase/regionserver/HStore.java | 4 ++--
.../hadoop/hbase/regionserver/MemStore.java | 10 ++++----
.../hbase/regionserver/wal/AbstractFSWAL.java | 9 ++++++++
.../regionserver/wal/SequenceIdAccounting.java | 21 +++++++++++++----
.../hadoop/hbase/wal/DisabledWALProvider.java | 6 +++++
.../java/org/apache/hadoop/hbase/wal/WAL.java | 2 ++
10 files changed, 75 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index ed7d274..48dc880 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -124,13 +125,20 @@ public class CompactingMemStore extends AbstractMemStore {
}
/**
- * This method is called when it is clear that the flush to disk is completed.
- * The store may do any post-flush actions at this point.
- * One example is to update the WAL with sequence number that is known only at the store level.
+ * This method is called before the flush is executed.
+ * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
+ * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
*/
@Override
- public void finalizeFlush() {
- updateLowestUnflushedSequenceIdInWAL(false);
+ public long preFlushSeqIDEstimation() {
+ if(compositeSnapshot) {
+ return HConstants.NO_SEQNUM;
+ }
+ Segment segment = getLastSegment();
+ if(segment == null) {
+ return HConstants.NO_SEQNUM;
+ }
+ return segment.getMinSequenceId();
}
@Override
@@ -364,6 +372,12 @@ public class CompactingMemStore extends AbstractMemStore {
}
}
+ private Segment getLastSegment() {
+ Segment localActive = getActive();
+ Segment tail = pipeline.getTail();
+ return tail == null ? localActive : tail;
+ }
+
private byte[] getFamilyNameInBytes() {
return store.getFamily().getName();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index e533bd0..9a844e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -267,6 +267,14 @@ public class CompactionPipeline {
if(segment != null) pipeline.addLast(segment);
}
+ public Segment getTail() {
+ List<? extends Segment> localCopy = getSegments();
+ if(localCopy.isEmpty()) {
+ return null;
+ }
+ return localCopy.get(localCopy.size()-1);
+ }
+
private boolean addFirst(ImmutableSegment segment) {
pipeline.addFirst(segment);
return true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index d4e6e12..63af570 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
@@ -169,7 +170,8 @@ public class DefaultMemStore extends AbstractMemStore {
}
@Override
- public void finalizeFlush() {
+ public long preFlushSeqIDEstimation() {
+ return HConstants.NO_SEQNUM;
}
@Override public boolean isSloppy() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f35d788..ef6239d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2412,9 +2412,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
MemstoreSize totalSizeOfFlushableStores = new MemstoreSize();
- Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
+ Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
for (Store store: storesToFlush) {
- flushedFamilyNames.add(store.getFamily().getName());
+ flushedFamilyNamesToSeq.put(store.getFamily().getName(),
+ ((HStore) store).preFlushSeqIDEstimation());
}
TreeMap<byte[], StoreFlushContext> storeFlushCtxs
@@ -2434,7 +2435,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
if (wal != null) {
Long earliestUnflushedSequenceIdForTheRegion =
- wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
+ wal.startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq);
if (earliestUnflushedSequenceIdForTheRegion == null) {
// This should never happen. This is how startCacheFlush signals flush cannot proceed.
String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
@@ -2677,9 +2678,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// If we get to here, the HStores have been written.
- for(Store storeToFlush :storesToFlush) {
- ((HStore) storeToFlush).finalizeFlush();
- }
if (wal != null) {
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 425667a..ad23ce0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2509,8 +2509,8 @@ public class HStore implements Store {
}
}
- public void finalizeFlush() {
- memstore.finalizeFlush();
+ public Long preFlushSeqIDEstimation() {
+ return memstore.preFlushSeqIDEstimation();
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index b094476..38d3e44 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -119,12 +119,12 @@ public interface MemStore {
MemstoreSize size();
/**
- * This method is called when it is clear that the flush to disk is completed.
- * The store may do any post-flush actions at this point.
- * One example is to update the wal with sequence number that is known only at the store level.
+ * This method is called before the flush is executed.
+ * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
+ * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
*/
- void finalizeFlush();
+ long preFlushSeqIDEstimation();
- /* Return true if the memstore may need some extra memory space*/
+ /* Return true if the memstore may use some extra memory space*/
boolean isSloppy();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 316e2f6..7e3bd59 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -429,6 +429,15 @@ public abstract class AbstractFSWAL<W> implements WAL {
}
@Override
+ public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
+ if (!closeBarrier.beginOp()) {
+ LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
+ return null;
+ }
+ return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
+ }
+
+ @Override
public void completeCacheFlush(byte[] encodedRegionName) {
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
closeBarrier.endOp();
http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
index 6e7ad9b..8226b82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -264,6 +264,14 @@ class SequenceIdAccounting {
* oldest/lowest outstanding edit.
*/
Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families) {
+ Map<byte[],Long> familytoSeq = new HashMap<>();
+ for (byte[] familyName : families){
+ familytoSeq.put(familyName,HConstants.NO_SEQNUM);
+ }
+ return startCacheFlush(encodedRegionName,familytoSeq);
+ }
+
+ Long startCacheFlush(final byte[] encodedRegionName, final Map<byte[], Long> familyToSeq) {
Map<ImmutableByteArray, Long> oldSequenceIds = null;
Long lowestUnflushedInRegion = HConstants.NO_SEQNUM;
synchronized (tieLock) {
@@ -273,9 +281,14 @@ class SequenceIdAccounting {
// circumstance because another concurrent thread now may add sequenceids for this family
// (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it
// is fine because updates are blocked when this method is called. Make sure!!!
- for (byte[] familyName : families) {
- ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName);
- Long seqId = m.remove(familyNameWrapper);
+ for (Map.Entry<byte[], Long> entry : familyToSeq.entrySet()) {
+ ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap((byte[]) entry.getKey());
+ Long seqId = null;
+ if(entry.getValue() == HConstants.NO_SEQNUM) {
+ seqId = m.remove(familyNameWrapper);
+ } else {
+ seqId = m.replace(familyNameWrapper, entry.getValue());
+ }
if (seqId != null) {
if (oldSequenceIds == null) {
oldSequenceIds = new HashMap<>();
@@ -344,7 +357,7 @@ class SequenceIdAccounting {
if (flushing != null) {
for (Map.Entry<ImmutableByteArray, Long> e : flushing.entrySet()) {
Long currentId = tmpMap.get(e.getKey());
- if (currentId != null && currentId.longValue() <= e.getValue().longValue()) {
+ if (currentId != null && currentId.longValue() < e.getValue().longValue()) {
String errorStr = Bytes.toString(encodedRegionName) + " family "
+ e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq="
+ currentId + ", previous oldest unflushed id=" + e.getValue();
http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 337f2b4..8f224fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -195,6 +196,11 @@ class DisabledWALProvider implements WALProvider {
sync();
}
+ public Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long>
+ flushedFamilyNamesToSeq) {
+ return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet());
+ }
+
@Override
public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
if (closed.get()) return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 030d8b6..b7adc60 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -161,6 +161,8 @@ public interface WAL extends Closeable {
*/
Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
+ Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long> familyToSeq);
+
/**
* Complete the cache flush.
* @param encodedRegionName Encoded region name.