You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/08/24 17:47:28 UTC
hbase git commit: HBASE-14273 Rename MVCC to MVCC: From
MultiVersionConsistencyControl to MultiVersionConcurrencyControl (Lars
Francke)
Repository: hbase
Updated Branches:
refs/heads/master eb52529c0 -> 9334a47d4
HBASE-14273 Rename MVCC to MVCC: From MultiVersionConsistencyControl to MultiVersionConcurrencyControl (Lars Francke)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9334a47d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9334a47d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9334a47d
Branch: refs/heads/master
Commit: 9334a47d4570f8adfc003f0fb2c5969a88c3bba0
Parents: eb52529
Author: stack <st...@apache.org>
Authored: Mon Aug 24 08:47:26 2015 -0700
Committer: stack <st...@apache.org>
Committed: Mon Aug 24 08:47:26 2015 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 26 +-
.../MultiVersionConcurrencyControl.java | 273 +++++++++++++++++++
.../MultiVersionConsistencyControl.java | 273 -------------------
.../hbase/regionserver/RegionScanner.java | 4 +-
.../hadoop/hbase/HBaseTestingUtility.java | 2 +-
.../hbase/regionserver/TestDefaultMemStore.java | 16 +-
.../TestMultiVersionConcurrencyControl.java | 135 +++++++++
.../TestMultiVersionConsistencyControl.java | 135 ---------
src/main/asciidoc/_chapters/architecture.adoc | 2 +-
9 files changed, 433 insertions(+), 433 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index df8bcf5..2293311 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -146,7 +146,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -584,8 +584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private boolean splitRequest;
private byte[] explicitSplitPoint = null;
- private final MultiVersionConsistencyControl mvcc =
- new MultiVersionConsistencyControl();
+ private final MultiVersionConcurrencyControl mvcc =
+ new MultiVersionConcurrencyControl();
// Coprocessor host
private RegionCoprocessorHost coprocessorHost;
@@ -1252,7 +1252,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- public MultiVersionConsistencyControl getMVCC() {
+ public MultiVersionConcurrencyControl getMVCC() {
return mvcc;
}
@@ -2081,7 +2081,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (this.memstoreSize.get() <= 0) {
// Take an update lock because am about to change the sequence id and we want the sequence id
// to be at the border of the empty memstore.
- MultiVersionConsistencyControl.WriteEntry w = null;
+ MultiVersionConcurrencyControl.WriteEntry w = null;
this.updatesLock.writeLock().lock();
try {
if (this.memstoreSize.get() <= 0) {
@@ -2137,7 +2137,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// to do this for a moment. It is quick. We also set the memstore size to zero here before we
// allow updates again so its value will represent the size of the updates received
// during flush
- MultiVersionConsistencyControl.WriteEntry w = null;
+ MultiVersionConcurrencyControl.WriteEntry w = null;
// We have to take an update lock during snapshot, or else a write could end up in both snapshot
// and memstore (makes it difficult to do atomic rows then)
status.setStatus("Obtaining lock to block concurrent updates");
@@ -2853,7 +2853,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
WALEdit walEdit = new WALEdit(isInReplay);
- MultiVersionConsistencyControl.WriteEntry w = null;
+ MultiVersionConcurrencyControl.WriteEntry w = null;
long txid = 0;
boolean doRollBackMemstore = false;
boolean locked = false;
@@ -3000,7 +3000,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if(isInReplay) {
mvccNum = batchOp.getReplaySequenceId();
} else {
- mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+ mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
}
//
// ------------------------------------
@@ -6635,7 +6635,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return;
}
- MultiVersionConsistencyControl.WriteEntry writeEntry = null;
+ MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
boolean locked;
boolean walSyncSuccessful = false;
List<RowLock> acquiredRowLocks;
@@ -6656,7 +6656,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
locked = true;
// Get a mvcc write number
- mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+ mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
long now = EnvironmentEdgeManager.currentTime();
try {
@@ -6853,7 +6853,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
// now start my own transaction
- mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+ mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
long now = EnvironmentEdgeManager.currentTime();
// Process each family
@@ -7106,7 +7106,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
// now start my own transaction
- mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+ mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
long now = EnvironmentEdgeManager.currentTime();
// Process each family
@@ -7332,7 +7332,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WriteState.HEAP_SIZE + // writestate
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
- MultiVersionConsistencyControl.FIXED_SIZE // mvcc
+ MultiVersionConcurrencyControl.FIXED_SIZE // mvcc
+ ClassSize.TREEMAP // maxSeqIdInStores
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
new file mode 100644
index 0000000..028d81a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -0,0 +1,273 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+
+/**
+ * Manages the read/write consistency within memstore. This provides
+ * an interface for readers to determine what entries to ignore, and
+ * a mechanism for writers to obtain new write numbers, then "commit"
+ * the new writes for readers to read (thus forming atomic transactions).
+ */
+@InterfaceAudience.Private
+public class MultiVersionConcurrencyControl {
+ private static final long NO_WRITE_NUMBER = 0;
+ private volatile long memstoreRead = 0;
+ private final Object readWaiters = new Object();
+
+ // This is the pending queue of writes.
+ private final LinkedList<WriteEntry> writeQueue =
+ new LinkedList<WriteEntry>();
+
+ /**
+ * Default constructor. Initializes the memstoreRead/Write points to 0.
+ */
+ public MultiVersionConcurrencyControl() {
+ }
+
+ /**
+ * Initializes the memstoreRead/Write points appropriately.
+ * @param startPoint
+ */
+ public void initialize(long startPoint) {
+ synchronized (writeQueue) {
+ writeQueue.clear();
+ memstoreRead = startPoint;
+ }
+ }
+
+ /**
+ *
+ * @param initVal The value we used initially and expected it'll be reset later
+ * @return WriteEntry instance.
+ */
+ WriteEntry beginMemstoreInsert() {
+ return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
+ }
+
+ /**
+ * Get a mvcc write number before an actual one(its log sequence Id) being assigned
+ * @param sequenceId
+ * @return long a faked write number which is bigger enough not to be seen by others before a real
+ * one is assigned
+ */
+ public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
+ // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
+ // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
+ // because each handler could increment sequence num twice and max concurrent in-flight
+ // transactions is the number of RPC handlers.
+ // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
+ // changes touch same row key
+ // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
+ // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
+ return sequenceId.incrementAndGet() + 1000000000;
+ }
+
+ /**
+ * This function starts a MVCC transaction with current region's log change sequence number. Since
+ * we set change sequence number when flushing current change to WAL(late binding), the flush
+ * order may differ from the order to start a MVCC transaction. For example, a change begins a
+ * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
+ * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
+ * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
+ * big number is safe because we only need it to prevent current change being seen and the number
+ * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
+ * for MVCC to align with flush sequence.
+ * @param curSeqNum
+ * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
+ */
+ public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
+ WriteEntry e = new WriteEntry(curSeqNum);
+ synchronized (writeQueue) {
+ writeQueue.add(e);
+ return e;
+ }
+ }
+
+ /**
+ * Complete a {@link WriteEntry} that was created by
+ * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
+ * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
+ * visible to MVCC readers.
+ * @throws IOException
+ */
+ public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
+ throws IOException {
+ if(e == null) return;
+ if (seqId != null) {
+ e.setWriteNumber(seqId.getSequenceId());
+ } else {
+ // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
+ // function beginMemstoreInsertWithSeqNum in case of failures
+ e.setWriteNumber(NO_WRITE_NUMBER);
+ }
+ waitForPreviousTransactionsComplete(e);
+ }
+
+ /**
+ * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
+ * end of this call, the global read point is at least as large as the write point of the passed
+ * in WriteEntry. Thus, the write is visible to MVCC readers.
+ */
+ public void completeMemstoreInsert(WriteEntry e) {
+ waitForPreviousTransactionsComplete(e);
+ }
+
+ /**
+ * Mark the {@link WriteEntry} as complete and advance the read point as
+ * much as possible.
+ *
+ * How much is the read point advanced?
+ * Let S be the set of all write numbers that are completed and where all previous write numbers
+ * are also completed. Then, the read point is advanced to the supremum of S.
+ *
+ * @param e
+ * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
+ */
+ boolean advanceMemstore(WriteEntry e) {
+ long nextReadValue = -1;
+ synchronized (writeQueue) {
+ e.markCompleted();
+
+ while (!writeQueue.isEmpty()) {
+ WriteEntry queueFirst = writeQueue.getFirst();
+ if (queueFirst.isCompleted()) {
+ // Using Max because Edit complete in WAL sync order not arriving order
+ nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
+ writeQueue.removeFirst();
+ } else {
+ break;
+ }
+ }
+
+ if (nextReadValue > memstoreRead) {
+ memstoreRead = nextReadValue;
+ }
+
+ // notify waiters on writeQueue before return
+ writeQueue.notifyAll();
+ }
+
+ if (nextReadValue > 0) {
+ synchronized (readWaiters) {
+ readWaiters.notifyAll();
+ }
+ }
+
+ if (memstoreRead >= e.getWriteNumber()) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Advances the current read point to be given seqNum if it is smaller than
+ * that.
+ */
+ void advanceMemstoreReadPointIfNeeded(long seqNum) {
+ synchronized (writeQueue) {
+ if (this.memstoreRead < seqNum) {
+ memstoreRead = seqNum;
+ }
+ }
+ }
+
+ /**
+ * Wait for all previous MVCC transactions complete
+ */
+ public void waitForPreviousTransactionsComplete() {
+ WriteEntry w = beginMemstoreInsert();
+ waitForPreviousTransactionsComplete(w);
+ }
+
+ public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
+ boolean interrupted = false;
+ WriteEntry w = waitedEntry;
+
+ try {
+ WriteEntry firstEntry = null;
+ do {
+ synchronized (writeQueue) {
+ // writeQueue won't be empty at this point, the following is just a safety check
+ if (writeQueue.isEmpty()) {
+ break;
+ }
+ firstEntry = writeQueue.getFirst();
+ if (firstEntry == w) {
+ // all previous in-flight transactions are done
+ break;
+ }
+ try {
+ writeQueue.wait(0);
+ } catch (InterruptedException ie) {
+ // We were interrupted... finish the loop -- i.e. cleanup --and then
+ // on our way out, reset the interrupt flag.
+ interrupted = true;
+ break;
+ }
+ }
+ } while (firstEntry != null);
+ } finally {
+ if (w != null) {
+ advanceMemstore(w);
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public long memstoreReadPoint() {
+ return memstoreRead;
+ }
+
+ public static class WriteEntry {
+ private long writeNumber;
+ private volatile boolean completed = false;
+
+ WriteEntry(long writeNumber) {
+ this.writeNumber = writeNumber;
+ }
+ void markCompleted() {
+ this.completed = true;
+ }
+ boolean isCompleted() {
+ return this.completed;
+ }
+ long getWriteNumber() {
+ return this.writeNumber;
+ }
+ void setWriteNumber(long val){
+ this.writeNumber = val;
+ }
+ }
+
+ public static final long FIXED_SIZE = ClassSize.align(
+ ClassSize.OBJECT +
+ 2 * Bytes.SIZEOF_LONG +
+ 2 * ClassSize.REFERENCE);
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
deleted file mode 100644
index 96af2c3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-
-/**
- * Manages the read/write consistency within memstore. This provides
- * an interface for readers to determine what entries to ignore, and
- * a mechanism for writers to obtain new write numbers, then "commit"
- * the new writes for readers to read (thus forming atomic transactions).
- */
-@InterfaceAudience.Private
-public class MultiVersionConsistencyControl {
- private static final long NO_WRITE_NUMBER = 0;
- private volatile long memstoreRead = 0;
- private final Object readWaiters = new Object();
-
- // This is the pending queue of writes.
- private final LinkedList<WriteEntry> writeQueue =
- new LinkedList<WriteEntry>();
-
- /**
- * Default constructor. Initializes the memstoreRead/Write points to 0.
- */
- public MultiVersionConsistencyControl() {
- }
-
- /**
- * Initializes the memstoreRead/Write points appropriately.
- * @param startPoint
- */
- public void initialize(long startPoint) {
- synchronized (writeQueue) {
- writeQueue.clear();
- memstoreRead = startPoint;
- }
- }
-
- /**
- *
- * @param initVal The value we used initially and expected it'll be reset later
- * @return WriteEntry instance.
- */
- WriteEntry beginMemstoreInsert() {
- return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
- }
-
- /**
- * Get a mvcc write number before an actual one(its log sequence Id) being assigned
- * @param sequenceId
- * @return long a faked write number which is bigger enough not to be seen by others before a real
- * one is assigned
- */
- public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
- // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
- // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
- // because each handler could increment sequence num twice and max concurrent in-flight
- // transactions is the number of RPC handlers.
- // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
- // changes touch same row key
- // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
- // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
- return sequenceId.incrementAndGet() + 1000000000;
- }
-
- /**
- * This function starts a MVCC transaction with current region's log change sequence number. Since
- * we set change sequence number when flushing current change to WAL(late binding), the flush
- * order may differ from the order to start a MVCC transaction. For example, a change begins a
- * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
- * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
- * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
- * big number is safe because we only need it to prevent current change being seen and the number
- * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
- * for MVCC to align with flush sequence.
- * @param curSeqNum
- * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
- */
- public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
- WriteEntry e = new WriteEntry(curSeqNum);
- synchronized (writeQueue) {
- writeQueue.add(e);
- return e;
- }
- }
-
- /**
- * Complete a {@link WriteEntry} that was created by
- * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
- * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
- * visible to MVCC readers.
- * @throws IOException
- */
- public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
- throws IOException {
- if(e == null) return;
- if (seqId != null) {
- e.setWriteNumber(seqId.getSequenceId());
- } else {
- // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
- // function beginMemstoreInsertWithSeqNum in case of failures
- e.setWriteNumber(NO_WRITE_NUMBER);
- }
- waitForPreviousTransactionsComplete(e);
- }
-
- /**
- * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
- * end of this call, the global read point is at least as large as the write point of the passed
- * in WriteEntry. Thus, the write is visible to MVCC readers.
- */
- public void completeMemstoreInsert(WriteEntry e) {
- waitForPreviousTransactionsComplete(e);
- }
-
- /**
- * Mark the {@link WriteEntry} as complete and advance the read point as
- * much as possible.
- *
- * How much is the read point advanced?
- * Let S be the set of all write numbers that are completed and where all previous write numbers
- * are also completed. Then, the read point is advanced to the supremum of S.
- *
- * @param e
- * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
- */
- boolean advanceMemstore(WriteEntry e) {
- long nextReadValue = -1;
- synchronized (writeQueue) {
- e.markCompleted();
-
- while (!writeQueue.isEmpty()) {
- WriteEntry queueFirst = writeQueue.getFirst();
- if (queueFirst.isCompleted()) {
- // Using Max because Edit complete in WAL sync order not arriving order
- nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
- writeQueue.removeFirst();
- } else {
- break;
- }
- }
-
- if (nextReadValue > memstoreRead) {
- memstoreRead = nextReadValue;
- }
-
- // notify waiters on writeQueue before return
- writeQueue.notifyAll();
- }
-
- if (nextReadValue > 0) {
- synchronized (readWaiters) {
- readWaiters.notifyAll();
- }
- }
-
- if (memstoreRead >= e.getWriteNumber()) {
- return true;
- }
- return false;
- }
-
- /**
- * Advances the current read point to be given seqNum if it is smaller than
- * that.
- */
- void advanceMemstoreReadPointIfNeeded(long seqNum) {
- synchronized (writeQueue) {
- if (this.memstoreRead < seqNum) {
- memstoreRead = seqNum;
- }
- }
- }
-
- /**
- * Wait for all previous MVCC transactions complete
- */
- public void waitForPreviousTransactionsComplete() {
- WriteEntry w = beginMemstoreInsert();
- waitForPreviousTransactionsComplete(w);
- }
-
- public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
- boolean interrupted = false;
- WriteEntry w = waitedEntry;
-
- try {
- WriteEntry firstEntry = null;
- do {
- synchronized (writeQueue) {
- // writeQueue won't be empty at this point, the following is just a safety check
- if (writeQueue.isEmpty()) {
- break;
- }
- firstEntry = writeQueue.getFirst();
- if (firstEntry == w) {
- // all previous in-flight transactions are done
- break;
- }
- try {
- writeQueue.wait(0);
- } catch (InterruptedException ie) {
- // We were interrupted... finish the loop -- i.e. cleanup --and then
- // on our way out, reset the interrupt flag.
- interrupted = true;
- break;
- }
- }
- } while (firstEntry != null);
- } finally {
- if (w != null) {
- advanceMemstore(w);
- }
- }
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
-
- public long memstoreReadPoint() {
- return memstoreRead;
- }
-
- public static class WriteEntry {
- private long writeNumber;
- private volatile boolean completed = false;
-
- WriteEntry(long writeNumber) {
- this.writeNumber = writeNumber;
- }
- void markCompleted() {
- this.completed = true;
- }
- boolean isCompleted() {
- return this.completed;
- }
- long getWriteNumber() {
- return this.writeNumber;
- }
- void setWriteNumber(long val){
- this.writeNumber = val;
- }
- }
-
- public static final long FIXED_SIZE = ClassSize.align(
- ClassSize.OBJECT +
- 2 * Bytes.SIZEOF_LONG +
- 2 * ClassSize.REFERENCE);
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
index 9e7ff0f..5b33db4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
@@ -63,7 +63,7 @@ public interface RegionScanner extends InternalScanner, Shipper {
long getMaxResultSize();
/**
- * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
+ * @return The Scanner's MVCC readPt see {@link MultiVersionConcurrencyControl}
*/
long getMvccReadPoint();
@@ -94,7 +94,7 @@ public interface RegionScanner extends InternalScanner, Shipper {
* close a region operation, an synchronize on the scanner object. Example: <code>
* HRegion region = ...;
* RegionScanner scanner = ...
- * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+ * MultiVersionConcurrencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
* region.startRegionOperation();
* try {
* synchronized(scanner) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index d8d9522..be5df71 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -3538,7 +3538,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
Scan scan = new Scan(get);
InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getFamily().getName()),
- // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set
+ // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
// readpoint 0.
0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 4848d66..e50260f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -69,13 +69,13 @@ public class TestDefaultMemStore extends TestCase {
private static final int ROW_COUNT = 10;
private static final int QUALIFIER_COUNT = ROW_COUNT;
private static final byte [] FAMILY = Bytes.toBytes("column");
- private MultiVersionConsistencyControl mvcc;
+ private MultiVersionConcurrencyControl mvcc;
private AtomicLong startSeqNum = new AtomicLong(0);
@Override
public void setUp() throws Exception {
super.setUp();
- this.mvcc = new MultiVersionConsistencyControl();
+ this.mvcc = new MultiVersionConcurrencyControl();
this.memstore = new DefaultMemStore();
}
@@ -248,7 +248,7 @@ public class TestDefaultMemStore extends TestCase {
final byte[] q2 = Bytes.toBytes("q2");
final byte[] v = Bytes.toBytes("value");
- MultiVersionConsistencyControl.WriteEntry w =
+ MultiVersionConcurrencyControl.WriteEntry w =
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv1 = new KeyValue(row, f, q1, v);
@@ -292,7 +292,7 @@ public class TestDefaultMemStore extends TestCase {
final byte[] v2 = Bytes.toBytes("value2");
// INSERT 1: Write both columns val1
- MultiVersionConsistencyControl.WriteEntry w =
+ MultiVersionConcurrencyControl.WriteEntry w =
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv11 = new KeyValue(row, f, q1, v1);
@@ -344,7 +344,7 @@ public class TestDefaultMemStore extends TestCase {
final byte[] q2 = Bytes.toBytes("q2");
final byte[] v1 = Bytes.toBytes("value1");
// INSERT 1: Write both columns val1
- MultiVersionConsistencyControl.WriteEntry w =
+ MultiVersionConcurrencyControl.WriteEntry w =
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv11 = new KeyValue(row, f, q1, v1);
@@ -388,7 +388,7 @@ public class TestDefaultMemStore extends TestCase {
final byte[] f = Bytes.toBytes("family");
final byte[] q1 = Bytes.toBytes("q1");
- final MultiVersionConsistencyControl mvcc;
+ final MultiVersionConcurrencyControl mvcc;
final MemStore memstore;
final AtomicLong startSeqNum;
@@ -397,7 +397,7 @@ public class TestDefaultMemStore extends TestCase {
public ReadOwnWritesTester(int id,
MemStore memstore,
- MultiVersionConsistencyControl mvcc,
+ MultiVersionConcurrencyControl mvcc,
AtomicReference<Throwable> caughtException,
AtomicLong startSeqNum)
{
@@ -418,7 +418,7 @@ public class TestDefaultMemStore extends TestCase {
private void internalRun() throws IOException {
for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
- MultiVersionConsistencyControl.WriteEntry w =
+ MultiVersionConcurrencyControl.WriteEntry w =
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
// Insert the sequence value (i)
http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java
new file mode 100644
index 0000000..7b6e7b3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This is a hammer test that verifies MultiVersionConcurrencyControl in a
+ * multiple writer single reader scenario.
+ */
+@Category({RegionServerTests.class, SmallTests.class})
+public class TestMultiVersionConcurrencyControl extends TestCase {
+ static class Writer implements Runnable {
+ final AtomicBoolean finished;
+ final MultiVersionConcurrencyControl mvcc;
+ final AtomicBoolean status;
+
+ Writer(AtomicBoolean finished, MultiVersionConcurrencyControl mvcc, AtomicBoolean status) {
+ this.finished = finished;
+ this.mvcc = mvcc;
+ this.status = status;
+ }
+
+ private Random rnd = new Random();
+ public boolean failed = false;
+
+ public void run() {
+ AtomicLong startPoint = new AtomicLong();
+ while (!finished.get()) {
+ MultiVersionConcurrencyControl.WriteEntry e =
+ mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet());
+ // System.out.println("Begin write: " + e.getWriteNumber());
+ // 10 usec - 500usec (including 0)
+ int sleepTime = rnd.nextInt(500);
+ // 500 * 1000 = 500,000ns = 500 usec
+ // 1 * 100 = 100ns = 1usec
+ try {
+ if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ mvcc.completeMemstoreInsert(e);
+ } catch (RuntimeException ex) {
+ // got failure
+ System.out.println(ex.toString());
+ ex.printStackTrace();
+ status.set(false);
+ return;
+ // Report failure if possible.
+ }
+ }
+ }
+ }
+
+ public void testParallelism() throws Exception {
+ final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+
+ final AtomicBoolean finished = new AtomicBoolean(false);
+
+ // fail flag for the reader thread
+ final AtomicBoolean readerFailed = new AtomicBoolean(false);
+ final AtomicLong failedAt = new AtomicLong();
+ Runnable reader = new Runnable() {
+ public void run() {
+ long prev = mvcc.memstoreReadPoint();
+ while (!finished.get()) {
+ long newPrev = mvcc.memstoreReadPoint();
+ if (newPrev < prev) {
+ // serious problem.
+ System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
+ readerFailed.set(true);
+ // might as well give up
+ failedAt.set(newPrev);
+ return;
+ }
+ }
+ }
+ };
+
+ // writer thread parallelism.
+ int n = 20;
+ Thread[] writers = new Thread[n];
+ AtomicBoolean[] statuses = new AtomicBoolean[n];
+ Thread readThread = new Thread(reader);
+
+ for (int i = 0; i < n; ++i) {
+ statuses[i] = new AtomicBoolean(true);
+ writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
+ writers[i].start();
+ }
+ readThread.start();
+
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException ex) {
+ }
+
+ finished.set(true);
+
+ readThread.join();
+ for (int i = 0; i < n; ++i) {
+ writers[i].join();
+ }
+
+ // check failure.
+ assertFalse(readerFailed.get());
+ for (int i = 0; i < n; ++i) {
+ assertTrue(statuses[i].get());
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java
deleted file mode 100644
index 09b2226..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.experimental.categories.Category;
-
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This is a hammer test that verifies MultiVersionConsistencyControl in a
- * multiple writer single reader scenario.
- */
-@Category({RegionServerTests.class, SmallTests.class})
-public class TestMultiVersionConsistencyControl extends TestCase {
- static class Writer implements Runnable {
- final AtomicBoolean finished;
- final MultiVersionConsistencyControl mvcc;
- final AtomicBoolean status;
-
- Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) {
- this.finished = finished;
- this.mvcc = mvcc;
- this.status = status;
- }
-
- private Random rnd = new Random();
- public boolean failed = false;
-
- public void run() {
- AtomicLong startPoint = new AtomicLong();
- while (!finished.get()) {
- MultiVersionConsistencyControl.WriteEntry e =
- mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet());
- // System.out.println("Begin write: " + e.getWriteNumber());
- // 10 usec - 500usec (including 0)
- int sleepTime = rnd.nextInt(500);
- // 500 * 1000 = 500,000ns = 500 usec
- // 1 * 100 = 100ns = 1usec
- try {
- if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
- } catch (InterruptedException e1) {
- }
- try {
- mvcc.completeMemstoreInsert(e);
- } catch (RuntimeException ex) {
- // got failure
- System.out.println(ex.toString());
- ex.printStackTrace();
- status.set(false);
- return;
- // Report failure if possible.
- }
- }
- }
- }
-
- public void testParallelism() throws Exception {
- final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl();
-
- final AtomicBoolean finished = new AtomicBoolean(false);
-
- // fail flag for the reader thread
- final AtomicBoolean readerFailed = new AtomicBoolean(false);
- final AtomicLong failedAt = new AtomicLong();
- Runnable reader = new Runnable() {
- public void run() {
- long prev = mvcc.memstoreReadPoint();
- while (!finished.get()) {
- long newPrev = mvcc.memstoreReadPoint();
- if (newPrev < prev) {
- // serious problem.
- System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
- readerFailed.set(true);
- // might as well give up
- failedAt.set(newPrev);
- return;
- }
- }
- }
- };
-
- // writer thread parallelism.
- int n = 20;
- Thread[] writers = new Thread[n];
- AtomicBoolean[] statuses = new AtomicBoolean[n];
- Thread readThread = new Thread(reader);
-
- for (int i = 0; i < n; ++i) {
- statuses[i] = new AtomicBoolean(true);
- writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
- writers[i].start();
- }
- readThread.start();
-
- try {
- Thread.sleep(10 * 1000);
- } catch (InterruptedException ex) {
- }
-
- finished.set(true);
-
- readThread.join();
- for (int i = 0; i < n; ++i) {
- writers[i].join();
- }
-
- // check failure.
- assertFalse(readerFailed.get());
- for (int i = 0; i < n; ++i) {
- assertTrue(statuses[i].get());
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/src/main/asciidoc/_chapters/architecture.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/architecture.adoc b/src/main/asciidoc/_chapters/architecture.adoc
index 740b585..e6a71f1 100644
--- a/src/main/asciidoc/_chapters/architecture.adoc
+++ b/src/main/asciidoc/_chapters/architecture.adoc
@@ -1495,7 +1495,7 @@ The minimum flush unit is per region, not at individual MemStore level.
* The `RegionScanner` object contains a list of `StoreScanner` objects, one per column family.
* Each `StoreScanner` object further contains a list of `StoreFileScanner` objects, corresponding to each StoreFile and HFile of the corresponding column family, and a list of `KeyValueScanner` objects for the MemStore.
* The two lists are merged into one, which is sorted in ascending order with the scan object for the MemStore at the end of the list.
-* When a `StoreFileScanner` object is constructed, it is associated with a `MultiVersionConsistencyControl` read point, which is the current `memstoreTS`, filtering out any new updates beyond the read point.
+* When a `StoreFileScanner` object is constructed, it is associated with a `MultiVersionConcurrencyControl` read point, which is the current `memstoreTS`, filtering out any new updates beyond the read point.
[[hfile]]
==== StoreFile (HFile)