You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2014/06/07 03:19:55 UTC
[2/2] git commit: hbase-8763: Combine MVCC and SeqId
hbase-8763: Combine MVCC and SeqId
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c682d57e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c682d57e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c682d57e
Branch: refs/heads/master
Commit: c682d57e92d9f18a02e1fe8dc50c5caa116e5d4a
Parents: d6cc2fb
Author: Jeffrey Zhong <jz...@JZhongs-MacBook-Pro.local>
Authored: Fri Jun 6 18:25:46 2014 -0700
Committer: Jeffrey Zhong <jz...@JZhongs-MacBook-Pro.local>
Committed: Fri Jun 6 18:25:46 2014 -0700
----------------------------------------------------------------------
.../hbase/regionserver/DefaultMemStore.java | 17 +-
.../hadoop/hbase/regionserver/HRegion.java | 413 +++++++++++--------
.../hadoop/hbase/regionserver/HStore.java | 5 +-
.../hadoop/hbase/regionserver/MemStore.java | 6 +-
.../MultiVersionConsistencyControl.java | 187 ++++++---
.../hbase/regionserver/SequenceNumber.java | 31 ++
.../apache/hadoop/hbase/regionserver/Store.java | 5 +-
.../hadoop/hbase/regionserver/StoreFlusher.java | 6 -
.../hadoop/hbase/regionserver/wal/FSHLog.java | 67 +--
.../hbase/regionserver/wal/FSWALEntry.java | 31 +-
.../hadoop/hbase/regionserver/wal/HLog.java | 11 +-
.../hadoop/hbase/regionserver/wal/HLogKey.java | 45 +-
.../hbase/regionserver/wal/HLogSplitter.java | 4 +-
.../hadoop/hbase/regionserver/wal/HLogUtil.java | 2 +-
.../hadoop/hbase/regionserver/wal/WALEdit.java | 2 +-
.../hadoop/hbase/client/TestMultiParallel.java | 4 +-
.../hbase/regionserver/TestDefaultMemStore.java | 23 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 17 +-
.../TestMultiVersionConsistencyControl.java | 4 +-
.../hadoop/hbase/regionserver/TestStore.java | 12 +-
20 files changed, 554 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 51f22d8..ad084a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
@@ -210,12 +211,13 @@ public class DefaultMemStore implements MemStore {
/**
* Write an update
* @param cell
- * @return approximate size of the passed key and value.
+ * @return approximate size of the passed KV & newly added KV which maybe different than the
+ * passed-in KV
*/
@Override
- public long add(Cell cell) {
+ public Pair<Long, Cell> add(Cell cell) {
KeyValue toAdd = maybeCloneWithAllocator(KeyValueUtil.ensureKeyValue(cell));
- return internalAdd(toAdd);
+ return new Pair<Long, Cell>(internalAdd(toAdd), toAdd);
}
@Override
@@ -1051,18 +1053,21 @@ public class DefaultMemStore implements MemStore {
byte [] empty = new byte[0];
for (int i = 0; i < count; i++) {
// Give each its own ts
- size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+ Pair<Long, Cell> ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+ size += ret.getFirst();
}
LOG.info("memstore1 estimated size=" + size);
for (int i = 0; i < count; i++) {
- size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+ Pair<Long, Cell> ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+ size += ret.getFirst();
}
LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
// Make a variably sized memstore.
DefaultMemStore memstore2 = new DefaultMemStore();
for (int i = 0; i < count; i++) {
- size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
+ Pair<Long, Cell> ret = memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
new byte[i]));
+ size += ret.getFirst();
}
LOG.info("memstore2 estimated size=" + size);
final int seconds = 30;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0c4f7ee..dedaf41 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -824,10 +824,11 @@ public class HRegion implements HeapSize { // , Writable{
}
}
}
- mvcc.initialize(maxMemstoreTS + 1);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+ maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
+ mvcc.initialize(maxSeqId);
return maxSeqId;
}
@@ -1684,7 +1685,7 @@ public class HRegion implements HeapSize { // , Writable{
// wal can be null replaying edits.
return wal != null?
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
- getNextSequenceId(wal, startTime), "Nothing to flush"):
+ getNextSequenceId(wal), "Nothing to flush"):
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
}
} finally {
@@ -1714,58 +1715,64 @@ public class HRegion implements HeapSize { // , Writable{
getRegionInfo().getEncodedName());
List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
long flushSeqId = -1L;
+
try {
- // Record the mvcc for all transactions in progress.
- w = mvcc.beginMemstoreInsert();
- mvcc.advanceMemstore(w);
- if (wal != null) {
- if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
- // This should never happen.
- String msg = "Flush will not be started for ["
- + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
- status.setStatus(msg);
- return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
+ try {
+ w = mvcc.beginMemstoreInsert();
+ if (wal != null) {
+ if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
+ // This should never happen.
+ String msg = "Flush will not be started for ["
+ + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
+ status.setStatus(msg);
+ return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
+ }
+ // Get a sequence id that we can use to denote the flush. It will be one beyond the last
+ // edit that made it into the hfile (the below does not add an edit, it just asks the
+ // WAL system to return next sequence edit).
+ flushSeqId = getNextSequenceId(wal);
+ } else {
+ // use the provided sequence Id as WAL is not being used for this flush.
+ flushSeqId = myseqid;
}
- // Get a sequence id that we can use to denote the flush. It will be one beyond the last
- // edit that made it into the hfile (the below does not add an edit, it just asks the
- // WAL system to return next sequence edit).
- flushSeqId = getNextSequenceId(wal, startTime);
- } else {
- // use the provided sequence Id as WAL is not being used for this flush.
- flushSeqId = myseqid;
- }
- for (Store s : stores.values()) {
- totalFlushableSize += s.getFlushableSize();
- storeFlushCtxs.add(s.createFlushContext(flushSeqId));
- }
+ for (Store s : stores.values()) {
+ totalFlushableSize += s.getFlushableSize();
+ storeFlushCtxs.add(s.createFlushContext(flushSeqId));
+ }
- // Prepare flush (take a snapshot)
- for (StoreFlushContext flush : storeFlushCtxs) {
- flush.prepare();
+ // Prepare flush (take a snapshot)
+ for (StoreFlushContext flush : storeFlushCtxs) {
+ flush.prepare();
+ }
+ } finally {
+ this.updatesLock.writeLock().unlock();
}
+ String s = "Finished memstore snapshotting " + this +
+ ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
+ status.setStatus(s);
+ if (LOG.isTraceEnabled()) LOG.trace(s);
+ // sync unflushed WAL changes when deferred log sync is enabled
+ // see HBASE-8208 for details
+ if (wal != null && !shouldSyncLog()) wal.sync();
+
+ // wait for all in-progress transactions to commit to HLog before
+ // we can start the flush. This prevents
+ // uncommitted transactions from being written into HFiles.
+ // We have to block before we start the flush, otherwise keys that
+ // were removed via a rollbackMemstore could be written to Hfiles.
+ mvcc.waitForPreviousTransactionsComplete(w);
+ // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
+ w = null;
+ s = "Flushing stores of " + this;
+ status.setStatus(s);
+ if (LOG.isTraceEnabled()) LOG.trace(s);
} finally {
- this.updatesLock.writeLock().unlock();
+ if (w != null) {
+ // in case of failure just mark current w as complete
+ mvcc.advanceMemstore(w);
+ }
}
- String s = "Finished memstore snapshotting " + this +
- ", syncing WAL and waiting on mvcc, flushSize=" + totalFlushableSize;
- status.setStatus(s);
- if (LOG.isTraceEnabled()) LOG.trace(s);
-
- // sync unflushed WAL changes when deferred log sync is enabled
- // see HBASE-8208 for details
- if (wal != null && !shouldSyncLog()) wal.sync();
-
- // wait for all in-progress transactions to commit to HLog before
- // we can start the flush. This prevents
- // uncommitted transactions from being written into HFiles.
- // We have to block before we start the flush, otherwise keys that
- // were removed via a rollbackMemstore could be written to Hfiles.
- mvcc.waitForRead(w);
-
- s = "Flushing stores of " + this;
- status.setStatus(s);
- if (LOG.isTraceEnabled()) LOG.trace(s);
// Any failure from here on out will be catastrophic requiring server
// restart so hlog content can be replayed and put back into the memstore.
@@ -1849,13 +1856,9 @@ public class HRegion implements HeapSize { // , Writable{
* @return Next sequence number unassociated with any actual edit.
* @throws IOException
*/
- private long getNextSequenceId(final HLog wal, final long now) throws IOException {
- HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable());
- // Call append but with an empty WALEdit. The returned sequence id will not be associated
- // with any edit and we can be sure it went in after all outstanding appends.
- wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
- WALEdit.EMPTY_WALEDIT, this.sequenceId, false);
- return key.getLogSeqNum();
+ private long getNextSequenceId(final HLog wal) throws IOException {
+ HLogKey key = this.appendNoSyncNoAppend(wal, null);
+ return key.getSequenceNumber();
}
//////////////////////////////////////////////////////////////////////////////
@@ -2349,11 +2352,14 @@ public class HRegion implements HeapSize { // , Writable{
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired
Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
+ List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
boolean success = false;
int noOfPuts = 0, noOfDeletes = 0;
+ HLogKey walKey = null;
+ long mvccNum = 0;
try {
// ------------------------------------
// STEP 1. Try to acquire as many locks as we can, and ensure
@@ -2475,13 +2481,13 @@ public class HRegion implements HeapSize { // , Writable{
lock(this.updatesLock.readLock(), numReadyToWrite);
locked = true;
-
+ mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
//
// ------------------------------------
// Acquire the latest mvcc number
// ----------------------------------
- w = mvcc.beginMemstoreInsert();
-
+ w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
+
// calling the pre CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) {
MiniBatchOperationInProgress<Mutation> miniBatchOp =
@@ -2506,13 +2512,12 @@ public class HRegion implements HeapSize { // , Writable{
continue;
}
doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
- addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
+ addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
}
// ------------------------------------
// STEP 4. Build WAL edit
// ----------------------------------
- boolean hasWalAppends = false;
Durability durability = Durability.USE_DEFAULT;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing
@@ -2543,13 +2548,13 @@ public class HRegion implements HeapSize { // , Writable{
throw new IOException("Multiple nonces per batch and not in replay");
}
// txid should always increase, so having the one from the last call is ok.
- HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup,
- currentNonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key,
- walEdit, getSequenceId(), true);
- hasWalAppends = true;
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), now, m.getClusterIds(),
+ currentNonceGroup, currentNonce);
+ txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey,
+ walEdit, getSequenceId(), true, null);
walEdit = new WALEdit(isInReplay);
+ walKey = null;
}
currentNonceGroup = nonceGroup;
currentNonce = nonce;
@@ -2570,12 +2575,15 @@ public class HRegion implements HeapSize { // , Writable{
// -------------------------
Mutation mutation = batchOp.getMutation(firstIndex);
if (walEdit.size() > 0) {
- HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
- currentNonceGroup, currentNonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key, walEdit,
- getSequenceId(), true);
- hasWalAppends = true;
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
+ mutation.getClusterIds(), currentNonceGroup, currentNonce);
+ txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
+ getSequenceId(), true, memstoreCells);
+ }
+ if(walKey == null){
+ // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
+ walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
}
// -------------------------------
@@ -2590,9 +2598,10 @@ public class HRegion implements HeapSize { // , Writable{
// -------------------------
// STEP 7. Sync wal.
// -------------------------
- if (hasWalAppends) {
+ if (txid != 0) {
syncOrDefer(txid, durability);
}
+
doRollBackMemstore = false;
// calling the post CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) {
@@ -2606,7 +2615,7 @@ public class HRegion implements HeapSize { // , Writable{
// STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
// ------------------------------------------------------------------
if (w != null) {
- mvcc.completeMemstoreInsert(w);
+ mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
w = null;
}
@@ -2636,9 +2645,11 @@ public class HRegion implements HeapSize { // , Writable{
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
- rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
+ rollbackMemstore(memstoreCells);
+ }
+ if (w != null) {
+ mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
}
- if (w != null) mvcc.completeMemstoreInsert(w);
if (locked) {
this.updatesLock.readLock().unlock();
@@ -2727,7 +2738,7 @@ public class HRegion implements HeapSize { // , Writable{
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow());
// wait for all previous transactions to complete (with lock held)
- mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ mvcc.waitForPreviousTransactionsComplete();
try {
if (this.getCoprocessorHost() != null) {
Boolean processed = null;
@@ -2903,34 +2914,25 @@ public class HRegion implements HeapSize { // , Writable{
* @param familyMap Map of kvs per family
* @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
* If null, then this method internally creates a mvcc transaction.
+ * @param output newly added KVs into memstore
* @return the additional memory usage of the memstore caused by the
* new entries.
*/
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
- MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
+ long mvccNum, List<KeyValue> memstoreCells) {
long size = 0;
- boolean freemvcc = false;
- try {
- if (localizedWriteEntry == null) {
- localizedWriteEntry = mvcc.beginMemstoreInsert();
- freemvcc = true;
- }
-
- for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
- byte[] family = e.getKey();
- List<Cell> cells = e.getValue();
+ for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
+ byte[] family = e.getKey();
+ List<Cell> cells = e.getValue();
- Store store = getStore(family);
- for (Cell cell: cells) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
- size += store.add(kv);
- }
- }
- } finally {
- if (freemvcc) {
- mvcc.completeMemstoreInsert(localizedWriteEntry);
+ Store store = getStore(family);
+ for (Cell cell: cells) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ kv.setMvccVersion(mvccNum);
+ Pair<Long, Cell> ret = store.add(kv);
+ size += ret.getFirst();
+ memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
}
}
@@ -2942,35 +2944,16 @@ public class HRegion implements HeapSize { // , Writable{
* called when a Put/Delete has updated memstore but subsequently fails to update
* the wal. This method is then invoked to rollback the memstore.
*/
- private void rollbackMemstore(BatchOperationInProgress<?> batchOp,
- Map<byte[], List<Cell>>[] familyMaps,
- int start, int end) {
+ private void rollbackMemstore(List<KeyValue> memstoreCells) {
int kvsRolledback = 0;
- for (int i = start; i < end; i++) {
- // skip over request that never succeeded in the first place.
- if (batchOp.retCodeDetails[i].getOperationStatusCode()
- != OperationStatusCode.SUCCESS) {
- continue;
- }
-
- // Rollback all the kvs for this row.
- Map<byte[], List<Cell>> familyMap = familyMaps[i];
- for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
- byte[] family = e.getKey();
- List<Cell> cells = e.getValue();
-
- // Remove those keys from the memstore that matches our
- // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
- // that even the memstoreTS has to match for keys that will be rolled-back.
- Store store = getStore(family);
- for (Cell cell: cells) {
- store.rollback(KeyValueUtil.ensureKeyValue(cell));
- kvsRolledback++;
- }
- }
+
+ for (KeyValue kv : memstoreCells) {
+ byte[] family = kv.getFamily();
+ Store store = getStore(family);
+ store.rollback(kv);
+ kvsRolledback++;
}
- LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
- " keyvalues from start:" + start + " to end:" + end);
+ LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
}
/**
@@ -3378,7 +3361,7 @@ public class HRegion implements HeapSize { // , Writable{
* @return True if we should flush.
*/
protected boolean restoreEdit(final Store s, final KeyValue kv) {
- long kvSize = s.add(kv);
+ long kvSize = s.add(kv).getFirst();
if (this.rsAccounting != null) {
rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
}
@@ -4883,7 +4866,10 @@ public class HRegion implements HeapSize { // , Writable{
List<RowLock> acquiredRowLocks;
long addedSize = 0;
List<KeyValue> mutations = new ArrayList<KeyValue>();
+ List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
+ long mvccNum = 0;
+ HLogKey walKey = null;
try {
// 2. Acquire the row lock(s)
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
@@ -4894,6 +4880,7 @@ public class HRegion implements HeapSize { // , Writable{
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.size());
locked = true;
+ mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
long now = EnvironmentEdgeManager.currentTimeMillis();
try {
@@ -4904,27 +4891,35 @@ public class HRegion implements HeapSize { // , Writable{
if (!mutations.isEmpty()) {
// 5. Get a mvcc write number
- writeEntry = mvcc.beginMemstoreInsert();
+ writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// 6. Apply to memstore
for (KeyValue kv : mutations) {
- kv.setMvccVersion(writeEntry.getWriteNumber());
+ kv.setMvccVersion(mvccNum);
Store store = getStore(kv);
if (store == null) {
checkFamily(CellUtil.cloneFamily(kv));
// unreachable
}
- addedSize += store.add(kv);
+ Pair<Long, Cell> ret = store.add(kv);
+ addedSize += ret.getFirst();
+ memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
}
long txid = 0;
// 7. Append no sync
if (!walEdit.isEmpty()) {
- HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), now, processor.getClusterIds(), nonceGroup,
- nonce);
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
+ processor.getClusterIds(), nonceGroup, nonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
- key, walEdit, getSequenceId(), true);
+ walKey, walEdit, getSequenceId(), true, memstoreCells);
}
+ if(walKey == null){
+ // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
+ // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
+ walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ }
+
// 8. Release region lock
if (locked) {
this.updatesLock.readLock().unlock();
@@ -4951,7 +4946,7 @@ public class HRegion implements HeapSize { // , Writable{
}
// 11. Roll mvcc forward
if (writeEntry != null) {
- mvcc.completeMemstoreInsert(writeEntry);
+ mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
}
if (locked) {
this.updatesLock.readLock().unlock();
@@ -5055,8 +5050,12 @@ public class HRegion implements HeapSize { // , Writable{
// Lock row
startRegionOperation(Operation.APPEND);
this.writeRequestsCount.increment();
+ long mvccNum = 0;
WriteEntry w = null;
- RowLock rowLock;
+ HLogKey walKey = null;
+ RowLock rowLock = null;
+ List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
+ boolean doRollBackMemstore = false;
try {
rowLock = getRowLock(row);
try {
@@ -5064,7 +5063,7 @@ public class HRegion implements HeapSize { // , Writable{
try {
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
- mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ mvcc.waitForPreviousTransactionsComplete();
if (this.coprocessorHost != null) {
Result r = this.coprocessorHost.preAppendAfterRowLock(append);
if(r!= null) {
@@ -5072,7 +5071,8 @@ public class HRegion implements HeapSize { // , Writable{
}
}
// now start my own transaction
- w = mvcc.beginMemstoreInsert();
+ mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+ w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
@@ -5140,7 +5140,7 @@ public class HRegion implements HeapSize { // , Writable{
// so only need to update the timestamp to 'now'
newKV.updateLatestStamp(Bytes.toBytes(now));
}
- newKV.setMvccVersion(w.getWriteNumber());
+ newKV.setMvccVersion(mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@@ -5161,34 +5161,43 @@ public class HRegion implements HeapSize { // , Writable{
tempMemstore.put(store, kvs);
}
- // Actually write to WAL now
- if (writeToWAL) {
- // Using default cluster id, as this can only happen in the originating
- // cluster. A slave cluster receives the final value (not the delta)
- // as a Put.
- HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), key, walEdits,
- this.sequenceId, true);
- } else {
- recordMutationWithoutWal(append.getFamilyCellMap());
- }
-
//Actually write to Memstore now
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
if (store.getFamily().getMaxVersions() == 1) {
// upsert if VERSIONS for this CF == 1
size += store.upsert(entry.getValue(), getSmallestReadPoint());
+ memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
} else {
// otherwise keep older versions around
for (Cell cell: entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- size += store.add(kv);
+ Pair<Long, Cell> ret = store.add(kv);
+ size += ret.getFirst();
+ memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
+ doRollBackMemstore = true;
}
}
allKVs.addAll(entry.getValue());
}
+
+ // Actually write to WAL now
+ if (writeToWAL) {
+ // Using default cluster id, as this can only happen in the originating
+ // cluster. A slave cluster receives the final value (not the delta)
+ // as a Put.
+ walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
+ txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
+ this.sequenceId, true, memstoreCells);
+ } else {
+ recordMutationWithoutWal(append.getFamilyCellMap());
+ }
+ if(walKey == null){
+ // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
+ walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ }
+
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
} finally {
@@ -5196,14 +5205,23 @@ public class HRegion implements HeapSize { // , Writable{
}
} finally {
rowLock.release();
+ rowLock = null;
}
- if (writeToWAL) {
- // sync the transaction log outside the rowlock
+ // sync the transaction log outside the rowlock
+ if(txid != 0){
syncOrDefer(txid, durability);
}
+ doRollBackMemstore = false;
} finally {
+ if (rowLock != null) {
+ rowLock.release();
+ }
+ // if the wal sync was unsuccessful, remove keys from memstore
+ if (doRollBackMemstore) {
+ rollbackMemstore(memstoreCells);
+ }
if (w != null) {
- mvcc.completeMemstoreInsert(w);
+ mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
}
closeRegionOperation(Operation.APPEND);
}
@@ -5250,15 +5268,20 @@ public class HRegion implements HeapSize { // , Writable{
// Lock row
startRegionOperation(Operation.INCREMENT);
this.writeRequestsCount.increment();
+ RowLock rowLock = null;
WriteEntry w = null;
+ HLogKey walKey = null;
+ long mvccNum = 0;
+ List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
+ boolean doRollBackMemstore = false;
try {
- RowLock rowLock = getRowLock(row);
+ rowLock = getRowLock(row);
try {
lock(this.updatesLock.readLock());
try {
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
- mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ mvcc.waitForPreviousTransactionsComplete();
if (this.coprocessorHost != null) {
Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
if (r != null) {
@@ -5266,7 +5289,8 @@ public class HRegion implements HeapSize { // , Writable{
}
}
// now start my own transaction
- w = mvcc.beginMemstoreInsert();
+ mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+ w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry<byte [], List<Cell>> family:
@@ -5330,7 +5354,7 @@ public class HRegion implements HeapSize { // , Writable{
System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
}
- newKV.setMvccVersion(w.getWriteNumber());
+ newKV.setMvccVersion(mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@@ -5357,20 +5381,6 @@ public class HRegion implements HeapSize { // , Writable{
}
}
- // Actually write to WAL now
- if (walEdits != null && !walEdits.isEmpty()) {
- if (writeToWAL) {
- // Using default cluster id, as this can only happen in the originating
- // cluster. A slave cluster receives the final value (not the delta)
- // as a Put.
- HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
- key, walEdits, getSequenceId(), true);
- } else {
- recordMutationWithoutWal(increment.getFamilyCellMap());
- }
- }
//Actually write to Memstore now
if (!tempMemstore.isEmpty()) {
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
@@ -5378,30 +5388,62 @@ public class HRegion implements HeapSize { // , Writable{
if (store.getFamily().getMaxVersions() == 1) {
// upsert if VERSIONS for this CF == 1
size += store.upsert(entry.getValue(), getSmallestReadPoint());
+ memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
} else {
// otherwise keep older versions around
for (Cell cell : entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- size += store.add(kv);
+ Pair<Long, Cell> ret = store.add(kv);
+ size += ret.getFirst();
+ memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
+ doRollBackMemstore = true;
}
}
}
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
}
+
+ // Actually write to WAL now
+ if (walEdits != null && !walEdits.isEmpty()) {
+ if (writeToWAL) {
+ // Using default cluster id, as this can only happen in the originating
+ // cluster. A slave cluster receives the final value (not the delta)
+ // as a Put.
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
+ txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
+ walKey, walEdits, getSequenceId(), true, memstoreCells);
+ } else {
+ recordMutationWithoutWal(increment.getFamilyCellMap());
+ }
+ }
+ if(walKey == null){
+ // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
+ walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ }
} finally {
this.updatesLock.readLock().unlock();
}
} finally {
rowLock.release();
+ rowLock = null;
}
- if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) {
- // sync the transaction log outside the rowlock
+ // sync the transaction log outside the rowlock
+ if(txid != 0){
syncOrDefer(txid, durability);
}
+ doRollBackMemstore = false;
} finally {
+ if (rowLock != null) {
+ rowLock.release();
+ }
+ // if the wal sync was unsuccessful, remove keys from memstore
+ if (doRollBackMemstore) {
+ rollbackMemstore(memstoreCells);
+ }
if (w != null) {
- mvcc.completeMemstoreInsert(w);
+ mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
}
closeRegionOperation(Operation.INCREMENT);
if (this.metricsRegion != null) {
@@ -6130,4 +6172,23 @@ public class HRegion implements HeapSize { // , Writable{
}
}
}
+
+ /**
+ * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
+ * the WALEdit append later.
+ * @param wal
+ * @param cells list of KeyValues inserted into memstore. Those KeyValues are passed in order to
+ * be updated with right mvcc values(their log sequence nu
+ * @return
+ * @throws IOException
+ */
+ private HLogKey appendNoSyncNoAppend(final HLog wal, List<KeyValue> cells) throws IOException {
+ HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
+ HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ // Call append but with an empty WALEdit. The returned seqeunce id will not be associated
+ // with any edit and we can be sure it went in after all outstanding appends.
+ wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
+ WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
+ return key;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 6de94c8..2218244 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -564,10 +565,10 @@ public class HStore implements Store {
}
@Override
- public long add(final KeyValue kv) {
+ public Pair<Long, Cell> add(final KeyValue kv) {
lock.readLock().lock();
try {
- return this.memstore.add(kv);
+ return this.memstore.add(kv);
} finally {
lock.readLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index b370b49..ac2155a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.util.Pair;
/**
* The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s.
@@ -61,9 +62,10 @@ public interface MemStore extends HeapSize {
/**
* Write an update
* @param cell
- * @return approximate size of the passed key and value.
+ * @return approximate size of the passed KV and the newly added KV which maybe different from the
+ * passed in KV.
*/
- long add(final Cell cell);
+ Pair<Long, Cell> add(final Cell cell);
/**
* @return Oldest timestamp of all the Cells in the MemStore
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
index b46d55b..4343313 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
@@ -18,7 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
@@ -32,20 +34,18 @@ import org.apache.hadoop.hbase.util.ClassSize;
*/
@InterfaceAudience.Private
public class MultiVersionConsistencyControl {
- private volatile long memstoreRead = 0;
- private volatile long memstoreWrite = 0;
-
+ private static final long NO_WRITE_NUMBER = 0;
+ private volatile long memstoreRead = 0;
private final Object readWaiters = new Object();
// This is the pending queue of writes.
private final LinkedList<WriteEntry> writeQueue =
new LinkedList<WriteEntry>();
-
+
/**
* Default constructor. Initializes the memstoreRead/Write points to 0.
*/
public MultiVersionConsistencyControl() {
- this.memstoreRead = this.memstoreWrite = 0;
}
/**
@@ -54,37 +54,86 @@ public class MultiVersionConsistencyControl {
*/
public void initialize(long startPoint) {
synchronized (writeQueue) {
- if (this.memstoreWrite != this.memstoreRead) {
- throw new RuntimeException("Already used this mvcc. Too late to initialize");
- }
-
- this.memstoreRead = this.memstoreWrite = startPoint;
+ writeQueue.clear();
+ memstoreRead = startPoint;
}
}
/**
- * Generate and return a {@link WriteEntry} with a new write number.
- * To complete the WriteEntry and wait for it to be visible,
- * call {@link #completeMemstoreInsert(WriteEntry)}.
+ *
+ * @param initVal The value we used initially and expected it'll be reset later
+ * @return
*/
- public WriteEntry beginMemstoreInsert() {
+ WriteEntry beginMemstoreInsert() {
+ return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
+ }
+
+ /**
+ * Get a mvcc write number before an actual one(its log sequence Id) being assigned
+ * @param sequenceId
+ * @return long a faked write number which is bigger enough not to be seen by others before a real
+ * one is assigned
+ */
+ public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
+ // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
+ // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
+ // because each handler could increment sequence num twice and max concurrent in-flight
+ // transactions is the number of RPC handlers.
+ // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
+ // changes touch same row key
+ // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
+ // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
+ return sequenceId.incrementAndGet() + 1000000000;
+ }
+
+ /**
+ * This function starts a MVCC transaction with current region's log change sequence number. Since
+ * we set change sequence number when flushing current change to WAL(late binding), the flush
+ * order may differ from the order to start a MVCC transaction. For example, a change begins a
+ * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
+ * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
+ * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
+ * big number is safe because we only need it to prevent current change being seen and the number
+ * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
+ * for MVCC to align with flush sequence.
+ * @param curSeqNum
+ * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
+ */
+ public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
+ WriteEntry e = new WriteEntry(curSeqNum);
synchronized (writeQueue) {
- long nextWriteNumber = ++memstoreWrite;
- WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
return e;
}
}
/**
- * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
- *
- * At the end of this call, the global read point is at least as large as the write point
- * of the passed in WriteEntry. Thus, the write is visible to MVCC readers.
+ * Complete a {@link WriteEntry} that was created by
+ * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
+ * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
+ * visible to MVCC readers.
+ * @throws IOException
+ */
+ public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceNumber seqNum)
+ throws IOException {
+ if(e == null) return;
+ if (seqNum != null) {
+ e.setWriteNumber(seqNum.getSequenceNumber());
+ } else {
+ // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
+ // function beginMemstoreInsertWithSeqNum in case of failures
+ e.setWriteNumber(NO_WRITE_NUMBER);
+ }
+ waitForPreviousTransactionsComplete(e);
+ }
+
+ /**
+ * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
+ * end of this call, the global read point is at least as large as the write point of the passed
+ * in WriteEntry. Thus, the write is visible to MVCC readers.
*/
public void completeMemstoreInsert(WriteEntry e) {
- advanceMemstore(e);
- waitForRead(e);
+ waitForPreviousTransactionsComplete(e);
}
/**
@@ -99,75 +148,94 @@ public class MultiVersionConsistencyControl {
* @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
*/
boolean advanceMemstore(WriteEntry e) {
+ long nextReadValue = -1;
synchronized (writeQueue) {
e.markCompleted();
- long nextReadValue = -1;
- boolean ranOnce=false;
while (!writeQueue.isEmpty()) {
- ranOnce=true;
WriteEntry queueFirst = writeQueue.getFirst();
-
- if (nextReadValue > 0) {
- if (nextReadValue+1 != queueFirst.getWriteNumber()) {
- throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
- + nextReadValue + " next: " + queueFirst.getWriteNumber());
- }
- }
-
if (queueFirst.isCompleted()) {
- nextReadValue = queueFirst.getWriteNumber();
+ // Using Max because Edit complete in WAL sync order not arriving order
+ nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
writeQueue.removeFirst();
} else {
break;
}
}
- if (!ranOnce) {
- throw new RuntimeException("never was a first");
+ if (nextReadValue > memstoreRead) {
+ memstoreRead = nextReadValue;
}
- if (nextReadValue > 0) {
- synchronized (readWaiters) {
- memstoreRead = nextReadValue;
- readWaiters.notifyAll();
- }
- }
- if (memstoreRead >= e.getWriteNumber()) {
- return true;
+ // notify waiters on writeQueue before return
+ writeQueue.notifyAll();
+ }
+
+ if (nextReadValue > 0) {
+ synchronized (readWaiters) {
+ readWaiters.notifyAll();
}
- return false;
}
+
+ if (memstoreRead >= e.getWriteNumber()) {
+ return true;
+ }
+ return false;
}
/**
- * Wait for the global readPoint to advance upto
- * the specified transaction number.
+ * Wait for all previous MVCC transactions complete
*/
- public void waitForRead(WriteEntry e) {
+ public void waitForPreviousTransactionsComplete() {
+ WriteEntry w = beginMemstoreInsert();
+ waitForPreviousTransactionsComplete(w);
+ }
+
+ public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
boolean interrupted = false;
- synchronized (readWaiters) {
- while (memstoreRead < e.getWriteNumber()) {
- try {
- readWaiters.wait(0);
- } catch (InterruptedException ie) {
- // We were interrupted... finish the loop -- i.e. cleanup --and then
- // on our way out, reset the interrupt flag.
- interrupted = true;
+ WriteEntry w = waitedEntry;
+
+ try {
+ WriteEntry firstEntry = null;
+ do {
+ synchronized (writeQueue) {
+ // writeQueue won't be empty at this point, the following is just a safety check
+ if (writeQueue.isEmpty()) {
+ break;
+ }
+ firstEntry = writeQueue.getFirst();
+ if (firstEntry == w) {
+ // all previous in-flight transactions are done
+ break;
+ }
+ try {
+ writeQueue.wait(0);
+ } catch (InterruptedException ie) {
+ // We were interrupted... finish the loop -- i.e. cleanup --and then
+ // on our way out, reset the interrupt flag.
+ interrupted = true;
+ break;
+ }
}
+ } while (firstEntry != null);
+ } finally {
+ if (w != null) {
+ advanceMemstore(w);
}
}
- if (interrupted) Thread.currentThread().interrupt();
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
public long memstoreReadPoint() {
return memstoreRead;
}
-
public static class WriteEntry {
private long writeNumber;
private boolean completed = false;
+
WriteEntry(long writeNumber) {
this.writeNumber = writeNumber;
}
@@ -180,6 +248,9 @@ public class MultiVersionConsistencyControl {
long getWriteNumber() {
return this.writeNumber;
}
+ void setWriteNumber(long val){
+ this.writeNumber = val;
+ }
}
public static final long FIXED_SIZE = ClassSize.align(
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java
new file mode 100644
index 0000000..90b1029
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Interface which abstracts implementations on log sequence number assignment
+ */
+@InterfaceAudience.Private
+public interface SequenceNumber {
+ public long getSequenceNumber() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 8923769..fd73f2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or
@@ -122,9 +123,9 @@ public interface Store extends HeapSize, StoreConfigInformation {
/**
* Adds a value to the memstore
* @param kv
- * @return memstore size delta
+ * @return memstore size delta & newly added KV which maybe different than the passed in KV
*/
- long add(KeyValue kv);
+ Pair<Long, Cell> add(KeyValue kv);
/**
* When was the last edit done in the memstore
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index b876972..7403700 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -121,12 +121,6 @@ abstract class StoreFlusher {
// set its memstoreTS to 0. This will help us save space when writing to
// disk.
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
- if (kv.getMvccVersion() <= smallestReadPoint) {
- // let us not change the original KV. It could be in the memstore
- // changing its memstoreTS could affect other threads/scanners.
- kv = kv.shallowCopy();
- kv.setMvccVersion(0);
- }
sink.append(kv);
}
kvs.clear();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 268e302..c0c7dbf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1064,13 +1064,26 @@ class FSHLog implements HLog, Syncable {
}
}
+ /**
+ * @param now
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tableName
+ * @param clusterIds that have consumed the change
+ * @return New log key.
+ */
+ protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
+ long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+ return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
+ }
+
@Override
@VisibleForTesting
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, AtomicLong sequenceId)
throws IOException {
HLogKey logKey = new HLogKey(info.getEncodedNameAsBytes(), tableName, now);
- append(htd, info, logKey, edits, sequenceId, true, true);
+ append(htd, info, logKey, edits, sequenceId, true, true, null);
}
@Override
@@ -1079,14 +1092,15 @@ class FSHLog implements HLog, Syncable {
boolean inMemstore, long nonceGroup, long nonce) throws IOException {
HLogKey logKey =
new HLogKey(info.getEncodedNameAsBytes(), tableName, now, clusterIds, nonceGroup, nonce);
- return append(htd, info, logKey, edits, sequenceId, false, inMemstore);
+ return append(htd, info, logKey, edits, sequenceId, false, inMemstore, null);
}
@Override
public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key,
- final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore)
+ final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
+ final List<KeyValue> memstoreKVs)
throws IOException {
- return append(htd, info, key, edits, sequenceId, false, inMemstore);
+ return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreKVs);
}
/**
@@ -1101,19 +1115,22 @@ class FSHLog implements HLog, Syncable {
* @param sync shall we sync after we call the append?
* @param inMemstore
* @param sequenceId The region sequence id reference.
+ * @param memstoreKVs
* @return txid of this transaction or if nothing to do, the last txid
* @throws IOException
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
justification="Will never be null")
private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key,
- WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore)
+ WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore,
+ List<KeyValue> memstoreKVs)
throws IOException {
if (!this.enabled) return this.highestUnsyncedSequence;
if (this.closed) throw new IOException("Cannot append; log is closed");
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
TraceScope scope = Trace.startSpan("FSHLog.append");
+
// This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need
// all this to make a key and then below to append the edit, we need to carry htd, info,
// etc. all over the ring buffer.
@@ -1124,19 +1141,10 @@ class FSHLog implements HLog, Syncable {
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
// edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the
// latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
- entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri);
+ entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreKVs);
truck.loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
- // Now wait until the region edit/sequence id is available. The 'entry' has an internal
- // latch that is thrown when the region edit/sequence id is set. Calling
- // entry.getRegionSequenceId will cause us block until the latch is thrown. The return is
- // the region edit/sequence id, not the ring buffer txid.
- try {
- entry.getRegionSequenceId();
- } catch (InterruptedException e) {
- throw convertInterruptedExceptionToIOException(e);
- }
}
// doSync is set in tests. Usually we arrive in here via appendNoSync w/ the sync called after
// all edits on a handler have been added.
@@ -1894,6 +1902,14 @@ class FSHLog implements HLog, Syncable {
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
// so region sequenceids will also be in order.
regionSequenceId = entry.stampRegionSequenceId();
+
+ // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
+ // region sequence id only, a region edit/sequence id that is not associated with an actual
+ // edit. It has to go through all the rigmarole to be sure we have the right ordering.
+ if (entry.getEdit().isEmpty()) {
+ return;
+ }
+
// Coprocessor hook.
if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
entry.getEdit())) {
@@ -1909,19 +1925,16 @@ class FSHLog implements HLog, Syncable {
entry.getEdit());
}
}
- // If empty, there is nothing to append. Maybe empty when we are looking for a region
- // sequence id only, a region edit/sequence id that is not associated with an actual edit.
- // It has to go through all the rigmarole to be sure we have the right ordering.
- if (!entry.getEdit().isEmpty()) {
- writer.append(entry);
- assert highestUnsyncedSequence < entry.getSequence();
- highestUnsyncedSequence = entry.getSequence();
- Long lRegionSequenceId = Long.valueOf(regionSequenceId);
- highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
- if (entry.isInMemstore()) {
- oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId);
- }
+
+ writer.append(entry);
+ assert highestUnsyncedSequence < entry.getSequence();
+ highestUnsyncedSequence = entry.getSequence();
+ Long lRegionSequenceId = Long.valueOf(regionSequenceId);
+ highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
+ if (entry.isInMemstore()) {
+ oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId);
}
+
coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
// Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTimeMillis() - start);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 9799269..1e9472a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
/**
* A WAL Entry for {@link FSHLog} implementation. Immutable.
@@ -41,19 +43,18 @@ class FSWALEntry extends HLog.Entry {
private final transient boolean inMemstore;
private final transient HTableDescriptor htd;
private final transient HRegionInfo hri;
- // Latch that is set on creation and then is undone on the other side of the ring buffer by the
- // consumer thread just after it sets the region edit/sequence id in here.
- private final transient CountDownLatch latch = new CountDownLatch(1);
+ private final transient List<KeyValue> memstoreKVs;
FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit,
final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
- final HTableDescriptor htd, final HRegionInfo hri) {
+ final HTableDescriptor htd, final HRegionInfo hri, List<KeyValue> memstoreKVs) {
super(key, edit);
this.regionSequenceIdReference = referenceToRegionSequenceId;
this.inMemstore = inMemstore;
this.htd = htd;
this.hri = hri;
this.sequence = sequence;
+ this.memstoreKVs = memstoreKVs;
}
public String toString() {
@@ -90,15 +91,13 @@ class FSWALEntry extends HLog.Entry {
*/
long stampRegionSequenceId() {
long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
- getKey().setLogSeqNum(regionSequenceId);
- // On creation, a latch was set. Count it down when sequence id is set. This will free
- // up anyone blocked on {@link #getRegionSequenceId()}
- this.latch.countDown();
+ if(memstoreKVs != null && !memstoreKVs.isEmpty()) {
+ for(KeyValue kv : this.memstoreKVs){
+ kv.setMvccVersion(regionSequenceId);
+ }
+ }
+ HLogKey key = getKey();
+ key.setLogSeqNum(regionSequenceId);
return regionSequenceId;
}
-
- long getRegionSequenceId() throws InterruptedException {
- this.latch.await();
- return getKey().getLogSeqNum();
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index ca6f444..99a9a5c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -34,8 +34,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.io.Writable;
@@ -290,8 +292,8 @@ public interface HLog {
* @param sequenceId
* @throws IOException
* @deprecated For tests only and even then, should use
- * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)}
- * and {@link #sync()} instead.
+ * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean,
+ * List)} and {@link #sync()} instead.
*/
@VisibleForTesting
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
@@ -337,7 +339,7 @@ public interface HLog {
* able to sync an explicit edit only (the current default implementation syncs up to the time
* of the sync call syncing whatever is behind the sync).
* @throws IOException
- * @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)}
+ * @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean, List)}
* instead because you can get back the region edit/sequenceid; it is set into the passed in
* <code>key</code>.
*/
@@ -361,12 +363,13 @@ public interface HLog {
* @param inMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore.
+ * @param memstoreKVs list of KVs added into memstore
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
* @throws IOException
*/
long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits,
- AtomicLong sequenceId, boolean inMemstore)
+ AtomicLong sequenceId, boolean inMemstore, List<KeyValue> memstoreKVs)
throws IOException;
// TODO: Do we need all these versions of sync?
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index f591f4e..ad1c001 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -31,6 +32,10 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+
+import com.google.protobuf.HBaseZeroCopyByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,6 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
+import org.apache.hadoop.hbase.regionserver.SequenceNumber;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.WritableComparable;
@@ -49,7 +55,6 @@ import org.apache.hadoop.io.WritableUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
-import com.google.protobuf.HBaseZeroCopyByteString;
/**
* A Key for an entry in the change log.
@@ -64,7 +69,7 @@ import com.google.protobuf.HBaseZeroCopyByteString;
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
// purposes. They need to be merged into HLogEntry.
@InterfaceAudience.Private
-public class HLogKey implements WritableComparable<HLogKey> {
+public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
public static final Log LOG = LogFactory.getLog(HLogKey.class);
// should be < 0 (@see #readFields(DataInput))
@@ -114,6 +119,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
private byte [] encodedRegionName;
private TableName tablename;
private long logSeqNum;
+ private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
// Time at which this edit was written.
private long writeTime;
@@ -184,7 +190,8 @@ public class HLogKey implements WritableComparable<HLogKey> {
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce);
+ init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds,
+ nonceGroup, nonce);
}
/**
@@ -195,13 +202,14 @@ public class HLogKey implements WritableComparable<HLogKey> {
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename
+ * @param logSeqNum
* @param nonceGroup
* @param nonce
*/
- public HLogKey(final byte [] encodedRegionName, final TableName tablename, long nonceGroup,
- long nonce) {
- init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID,
- EnvironmentEdgeManager.currentTimeMillis(), EMPTY_UUIDS, nonceGroup, nonce);
+ public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
+ long nonceGroup, long nonce) {
+ init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(),
+ EMPTY_UUIDS, nonceGroup, nonce);
}
protected void init(final byte [] encodedRegionName, final TableName tablename,
@@ -238,11 +246,30 @@ public class HLogKey implements WritableComparable<HLogKey> {
}
/**
- * Allow that the log sequence id to be set post-construction.
+ * Allow that the log sequence id to be set post-construction and release all waiters on assigned
+ * sequence number.
* @param sequence
*/
void setLogSeqNum(final long sequence) {
this.logSeqNum = sequence;
+ this.seqNumAssignedLatch.countDown();
+ }
+
+ /**
+ * Wait for sequence number is assigned & return the assigned value
+ * @return long the new assigned sequence number
+ * @throws InterruptedException
+ */
+ public long getSequenceNumber() throws IOException {
+ try {
+ this.seqNumAssignedLatch.await();
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread interrupted waiting for next log sequence number");
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
+ }
+ return this.logSeqNum;
}
/**
@@ -358,7 +385,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
if (result == 0) {
if (this.logSeqNum < o.logSeqNum) {
result = -1;
- } else if (this.logSeqNum > o.logSeqNum ) {
+ } else if (this.logSeqNum > o.logSeqNum) {
result = 1;
}
if (result == 0) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 00e9f15..b9f82b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -1972,8 +1972,8 @@ public class HLogSplitter {
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
}
key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
- .getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(),
- clusterIds, walKey.getNonceGroup(), walKey.getNonce());
+ .getTableName().toByteArray()), walKey.getLogSequenceNumber(),
+ walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce());
logEntry.setFirst(key);
logEntry.setSecond(val);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
index fcb5610..6809aad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
@@ -262,7 +262,7 @@ public class HLogUtil {
final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
TableName tn = TableName.valueOf(c.getTableName().toByteArray());
HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false);
+ log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false, null);
log.sync();
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index b10c4a9..8ecb4b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -249,7 +249,7 @@ public class WALEdit implements Writable, HeapSize {
sb.append(">]");
return sb.toString();
}
-
+
/**
* Create a compacion WALEdit
* @param c
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 98563d6..2e74281 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -68,7 +68,7 @@ public class TestMultiParallel {
private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
private static final byte [][] KEYS = makeKeys();
- private static final int slaves = 2; // also used for testing HTable pool size
+ private static final int slaves = 3; // also used for testing HTable pool size
@BeforeClass public static void beforeClass() throws Exception {
((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
@@ -238,7 +238,7 @@ public class TestMultiParallel {
*
* @throws Exception
*/
- @Test (timeout=300000)
+ @Test (timeout=360000)
public void testFlushCommitsWithAbort() throws Exception {
LOG.info("test=testFlushCommitsWithAbort");
doTestFlushCommits(true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index f2db498..ebe95b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
@@ -61,6 +62,7 @@ public class TestDefaultMemStore extends TestCase {
private static final int QUALIFIER_COUNT = ROW_COUNT;
private static final byte [] FAMILY = Bytes.toBytes("column");
private MultiVersionConsistencyControl mvcc;
+ private AtomicLong startSeqNum = new AtomicLong(0);
@Override
public void setUp() throws Exception {
@@ -236,7 +238,7 @@ public class TestDefaultMemStore extends TestCase {
final byte[] v = Bytes.toBytes("value");
MultiVersionConsistencyControl.WriteEntry w =
- mvcc.beginMemstoreInsert();
+ mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv1 = new KeyValue(row, f, q1, v);
kv1.setMvccVersion(w.getWriteNumber());
@@ -250,7 +252,7 @@ public class TestDefaultMemStore extends TestCase {
s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv1});
- w = mvcc.beginMemstoreInsert();
+ w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv2 = new KeyValue(row, f, q2, v);
kv2.setMvccVersion(w.getWriteNumber());
memstore.add(kv2);
@@ -280,7 +282,7 @@ public class TestDefaultMemStore extends TestCase {
// INSERT 1: Write both columns val1
MultiVersionConsistencyControl.WriteEntry w =
- mvcc.beginMemstoreInsert();
+ mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setMvccVersion(w.getWriteNumber());
@@ -296,7 +298,7 @@ public class TestDefaultMemStore extends TestCase {
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START INSERT 2: Write both columns val2
- w = mvcc.beginMemstoreInsert();
+ w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv21 = new KeyValue(row, f, q1, v2);
kv21.setMvccVersion(w.getWriteNumber());
memstore.add(kv21);
@@ -332,7 +334,7 @@ public class TestDefaultMemStore extends TestCase {
final byte[] v1 = Bytes.toBytes("value1");
// INSERT 1: Write both columns val1
MultiVersionConsistencyControl.WriteEntry w =
- mvcc.beginMemstoreInsert();
+ mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setMvccVersion(w.getWriteNumber());
@@ -348,7 +350,7 @@ public class TestDefaultMemStore extends TestCase {
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START DELETE: Insert delete for one of the columns
- w = mvcc.beginMemstoreInsert();
+ w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
KeyValue.Type.DeleteColumn);
kvDel.setMvccVersion(w.getWriteNumber());
@@ -377,6 +379,7 @@ public class TestDefaultMemStore extends TestCase {
final MultiVersionConsistencyControl mvcc;
final MemStore memstore;
+ final AtomicLong startSeqNum;
AtomicReference<Throwable> caughtException;
@@ -384,12 +387,14 @@ public class TestDefaultMemStore extends TestCase {
public ReadOwnWritesTester(int id,
MemStore memstore,
MultiVersionConsistencyControl mvcc,
- AtomicReference<Throwable> caughtException)
+ AtomicReference<Throwable> caughtException,
+ AtomicLong startSeqNum)
{
this.mvcc = mvcc;
this.memstore = memstore;
this.caughtException = caughtException;
row = Bytes.toBytes(id);
+ this.startSeqNum = startSeqNum;
}
public void run() {
@@ -403,7 +408,7 @@ public class TestDefaultMemStore extends TestCase {
private void internalRun() throws IOException {
for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
MultiVersionConsistencyControl.WriteEntry w =
- mvcc.beginMemstoreInsert();
+ mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
// Insert the sequence value (i)
byte[] v = Bytes.toBytes(i);
@@ -433,7 +438,7 @@ public class TestDefaultMemStore extends TestCase {
AtomicReference<Throwable> caught = new AtomicReference<Throwable>();
for (int i = 0; i < NUM_THREADS; i++) {
- threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught);
+ threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught, this.startSeqNum);
threads[i].start();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index b2b4845..0f55b62 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -4152,15 +4152,16 @@ public class TestHRegion {
durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
// expect skip wal cases
- durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
- durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
- durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
- durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
- durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
- durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
+ durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
+ durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
+ durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
+ durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, true, false, false);
+ durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, true, false, false);
+ durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, true, false, false);
}
+ @SuppressWarnings("unchecked")
private void durabilityTest(String method, Durability tableDurability,
Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
final boolean expectSyncFromLogSyncer) throws Exception {
@@ -4183,7 +4184,7 @@ public class TestHRegion {
//verify append called or not
verify(log, expectAppend ? times(1) : never())
.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
- (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean());
+ (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<KeyValue>)any());
// verify sync called or not
if (expectSync || expectSyncFromLogSyncer) {
@@ -4202,7 +4203,7 @@ public class TestHRegion {
}
});
} else {
- verify(log, never()).sync(anyLong());
+ //verify(log, never()).sync(anyLong());
verify(log, never()).sync();
}