You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/09/30 20:48:59 UTC
[3/5] hbase git commit: HBASE-14465 Backport 'Allow rowlock to be
reader/write' to branch-1
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 454b9cc..0857fdf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -631,7 +631,7 @@ public class HStore implements Store {
// readers might pick it up. This assumes that the store is not getting any writes (otherwise
// in-flight transactions might be made visible)
if (!toBeAddedFiles.isEmpty()) {
- region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId());
+ region.getMVCC().advanceTo(this.getMaxSequenceId());
}
// notify scanners, close file readers, and recompute store size
@@ -1288,7 +1288,7 @@ public class HStore implements Store {
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
- this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
+ this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index 2d65387..00f349e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -18,239 +18,204 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.mortbay.log.Log;
/**
- * Manages the read/write consistency within memstore. This provides
- * an interface for readers to determine what entries to ignore, and
- * a mechanism for writers to obtain new write numbers, then "commit"
+ * Manages the read/write consistency. This provides an interface for readers to determine what
+ * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit"
* the new writes for readers to read (thus forming atomic transactions).
*/
@InterfaceAudience.Private
public class MultiVersionConcurrencyControl {
- private static final long NO_WRITE_NUMBER = 0;
- private volatile long memstoreRead = 0;
+ final AtomicLong readPoint = new AtomicLong(0);
+ final AtomicLong writePoint = new AtomicLong(0);
private final Object readWaiters = new Object();
+ /**
+ * Represents no value, or not set.
+ */
+ public static final long NONE = -1;
// This is the pending queue of writes.
- private final LinkedList<WriteEntry> writeQueue =
- new LinkedList<WriteEntry>();
+ //
+ // TODO(eclark): Should this be an array of fixed size to
+ // reduce the number of allocations on the write path?
+ // This could be equal to the number of handlers + a small number.
+ // TODO: St.Ack 20150903 Sounds good to me.
+ private final LinkedList<WriteEntry> writeQueue = new LinkedList<WriteEntry>();
- /**
- * Default constructor. Initializes the memstoreRead/Write points to 0.
- */
public MultiVersionConcurrencyControl() {
+ super();
}
/**
- * Initializes the memstoreRead/Write points appropriately.
- * @param startPoint
+ * Construct and set read point. Write point is uninitialized.
*/
- public void initialize(long startPoint) {
- synchronized (writeQueue) {
- writeQueue.clear();
- memstoreRead = startPoint;
- }
+ public MultiVersionConcurrencyControl(long startPoint) {
+ tryAdvanceTo(startPoint, NONE);
}
/**
- *
- * @param initVal The value we used initially and expected it'll be reset later
- * @return WriteEntry instance.
+ * Step the MVCC forward on to a new read/write basis.
+ * @param newStartPoint
*/
- WriteEntry beginMemstoreInsert() {
- return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
+ public void advanceTo(long newStartPoint) {
+ while (true) {
+ long seqId = this.getWritePoint();
+ if (seqId >= newStartPoint) break;
+ if (this.tryAdvanceTo(/* newSeqId = */ newStartPoint, /* expected = */ seqId)) break;
+ }
}
/**
- * Get a mvcc write number before an actual one(its log sequence Id) being assigned
- * @param sequenceId
- * @return long a faked write number which is bigger enough not to be seen by others before a real
- * one is assigned
+ * Step the MVCC forward on to a new read/write basis.
+ * @param newStartPoint Point to move read and write points to.
+ * @param expected If not -1 (#NONE)
+ * @return Returns false if <code>expected</code> is not equal to the
+ * current <code>readPoint</code> or if <code>startPoint</code> is less than current
+ * <code>readPoint</code>
*/
- public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
- // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
- // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
- // because each handler could increment sequence num twice and max concurrent in-flight
- // transactions is the number of RPC handlers.
- // We can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
- // changes touch same row key.
- // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
- // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all.
- // St.Ack 20150901 Where is the reset to NO_WRITE_NUMBER done?
- return sequenceId.incrementAndGet() + 1000000000;
+ boolean tryAdvanceTo(long newStartPoint, long expected) {
+ synchronized (writeQueue) {
+ long currentRead = this.readPoint.get();
+ long currentWrite = this.writePoint.get();
+ if (currentRead != currentWrite) {
+ throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
+ ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
+ }
+ if (expected != NONE && expected != currentRead) {
+ return false;
+ }
+
+ if (newStartPoint < currentRead) {
+ return false;
+ }
+
+ readPoint.set(newStartPoint);
+ writePoint.set(newStartPoint);
+ }
+ return true;
}
/**
- * This function starts a MVCC transaction with current region's log change sequence number. Since
- * we set change sequence number when flushing current change to WAL(late binding), the flush
- * order may differ from the order to start a MVCC transaction. For example, a change begins a
- * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
- * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
- * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
- * big number is safe because we only need it to prevent current change being seen and the number
- * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
- * for MVCC to align with flush sequence.
- * @param curSeqNum
- * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
+ * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
+ * to our queue of ongoing writes. Return this WriteEntry instance.
+ * To complete the write transaction and wait for it to be visible, call
+ * {@link #completeAndWait(WriteEntry)}. If the write failed, call
+ * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write
+ * transaction.
+ * @see #complete(WriteEntry)
+ * @see #completeAndWait(WriteEntry)
*/
- public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
- WriteEntry e = new WriteEntry(curSeqNum);
+ public WriteEntry begin() {
synchronized (writeQueue) {
+ long nextWriteNumber = writePoint.incrementAndGet();
+ WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
return e;
}
}
/**
- * Complete a {@link WriteEntry} that was created by
- * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
- * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
- * visible to MVCC readers.
- * @throws IOException
- */
- public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
- throws IOException {
- if(e == null) return;
- if (seqId != null) {
- e.setWriteNumber(seqId.getSequenceId());
- } else {
- // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
- // function beginMemstoreInsertWithSeqNum in case of failures
- e.setWriteNumber(NO_WRITE_NUMBER);
- }
- waitForPreviousTransactionsComplete(e);
- }
-
- /**
- * Cancel a write insert that failed.
- * Removes the write entry without advancing read point or without interfering with write
- * entries queued behind us. It is like #advanceMemstore(WriteEntry) only this method
- * will move the read point to the sequence id that is in WriteEntry even if it ridiculous (see
- * the trick in HRegion where we call {@link #getPreAssignedWriteNumber(AtomicLong)} just to mark
- * it as for special handling).
- * @param writeEntry Failed attempt at write. Does cleanup.
+ * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs
+ * to complete.
*/
- public void cancelMemstoreInsert(WriteEntry writeEntry) {
- // I'm not clear on how this voodoo all works but setting write number to -1 does NOT advance
- // readpoint and gets my little writeEntry completed and removed from queue of outstanding
- // events which seems right. St.Ack 20150901.
- writeEntry.setWriteNumber(NO_WRITE_NUMBER);
- advanceMemstore(writeEntry);
+ public void await() {
+ // Add a write and then wait on reads to catch up to it.
+ completeAndWait(begin());
}
/**
- * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
- * end of this call, the global read point is at least as large as the write point of the passed
- * in WriteEntry. Thus, the write is visible to MVCC readers.
+ * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the
+ * read point catches up to our write.
+ *
+ * At the end of this call, the global read point is at least as large as the write point
+ * of the passed in WriteEntry. Thus, the write is visible to MVCC readers.
*/
- public void completeMemstoreInsert(WriteEntry e) {
- waitForPreviousTransactionsComplete(e);
+ public void completeAndWait(WriteEntry e) {
+ complete(e);
+ waitForRead(e);
}
/**
- * Mark the {@link WriteEntry} as complete and advance the read point as
- * much as possible.
+ * Mark the {@link WriteEntry} as complete and advance the read point as much as possible.
+ * Call this even if the write has FAILED (AFTER backing out the write transaction
+ * changes completely) so we can clean up the outstanding transaction.
*
* How much is the read point advanced?
- * Let S be the set of all write numbers that are completed and where all previous write numbers
- * are also completed. Then, the read point is advanced to the supremum of S.
+ *
+ * Let S be the set of all write numbers that are completed. Set the read point to the highest
+ * numbered write of S.
+ *
+ * @param writeEntry
*
- * @param e
* @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
*/
- boolean advanceMemstore(WriteEntry e) {
- long nextReadValue = -1;
+ public boolean complete(WriteEntry writeEntry) {
synchronized (writeQueue) {
- e.markCompleted();
+ writeEntry.markCompleted();
+ long nextReadValue = NONE;
+ boolean ranOnce = false;
while (!writeQueue.isEmpty()) {
+ ranOnce = true;
WriteEntry queueFirst = writeQueue.getFirst();
+
+ if (nextReadValue > 0) {
+ if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
+ throw new RuntimeException("Invariant in complete violated, nextReadValue="
+ + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
+ }
+ }
+
if (queueFirst.isCompleted()) {
- // Using Max because Edit complete in WAL sync order not arriving order
- nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
+ nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
} else {
break;
}
}
- if (nextReadValue > memstoreRead) {
- memstoreRead = nextReadValue;
+ if (!ranOnce) {
+ throw new RuntimeException("There is no first!");
}
- // notify waiters on writeQueue before return
- writeQueue.notifyAll();
- }
-
- if (nextReadValue > 0) {
- synchronized (readWaiters) {
- readWaiters.notifyAll();
- }
- }
-
- if (memstoreRead >= e.getWriteNumber()) {
- return true;
- }
- return false;
- }
-
- /**
- * Advances the current read point to be given seqNum if it is smaller than
- * that.
- */
- void advanceMemstoreReadPointIfNeeded(long seqNum) {
- synchronized (writeQueue) {
- if (this.memstoreRead < seqNum) {
- memstoreRead = seqNum;
+ if (nextReadValue > 0) {
+ synchronized (readWaiters) {
+ readPoint.set(nextReadValue);
+ readWaiters.notifyAll();
+ }
}
+ return readPoint.get() >= writeEntry.getWriteNumber();
}
}
/**
- * Wait for all previous MVCC transactions complete
+ * Wait for the global readPoint to advance up to the passed in write entry number.
*/
- public void waitForPreviousTransactionsComplete() {
- WriteEntry w = beginMemstoreInsert();
- waitForPreviousTransactionsComplete(w);
- }
-
- public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
+ void waitForRead(WriteEntry e) {
boolean interrupted = false;
- WriteEntry w = waitedEntry;
-
- try {
- WriteEntry firstEntry = null;
- do {
- synchronized (writeQueue) {
- // writeQueue won't be empty at this point, the following is just a safety check
- if (writeQueue.isEmpty()) {
- break;
- }
- firstEntry = writeQueue.getFirst();
- if (firstEntry == w) {
- // all previous in-flight transactions are done
- break;
- }
- try {
- writeQueue.wait(0);
- } catch (InterruptedException ie) {
- // We were interrupted... finish the loop -- i.e. cleanup --and then
- // on our way out, reset the interrupt flag.
- interrupted = true;
- break;
- }
+ int count = 0;
+ synchronized (readWaiters) {
+ while (readPoint.get() < e.getWriteNumber()) {
+ if (count % 100 == 0 && count > 0) {
+ Log.warn("STUCK: " + this);
+ }
+ count++;
+ try {
+ readWaiters.wait(10);
+ } catch (InterruptedException ie) {
+ // We were interrupted... finish the loop -- i.e. cleanup --and then
+ // on our way out, reset the interrupt flag.
+ interrupted = true;
}
- } while (firstEntry != null);
- } finally {
- if (w != null) {
- advanceMemstore(w);
}
}
if (interrupted) {
@@ -258,28 +223,60 @@ public class MultiVersionConcurrencyControl {
}
}
- public long memstoreReadPoint() {
- return memstoreRead;
+ @VisibleForTesting
+ public String toString() {
+ StringBuffer sb = new StringBuffer(256);
+ sb.append("readPoint=");
+ sb.append(this.readPoint.get());
+ sb.append(", writePoint=");
+ sb.append(this.writePoint);
+ synchronized (this.writeQueue) {
+ for (WriteEntry we: this.writeQueue) {
+ sb.append(", [");
+ sb.append(we);
+ sb.append("]");
+ }
+ }
+ return sb.toString();
+ }
+
+ public long getReadPoint() {
+ return readPoint.get();
+ }
+
+ @VisibleForTesting
+ public long getWritePoint() {
+ return writePoint.get();
}
+ /**
+ * Write number and whether write has completed given out at start of a write transaction.
+ * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
+ */
+ @InterfaceAudience.Private
public static class WriteEntry {
- private long writeNumber;
- private volatile boolean completed = false;
+ private final long writeNumber;
+ private boolean completed = false;
WriteEntry(long writeNumber) {
this.writeNumber = writeNumber;
}
+
void markCompleted() {
this.completed = true;
}
+
boolean isCompleted() {
return this.completed;
}
- long getWriteNumber() {
+
+ public long getWriteNumber() {
return this.writeNumber;
}
- void setWriteNumber(long val){
- this.writeNumber = val;
+
+ @Override
+ public String toString() {
+ return this.writeNumber + ", " + this.completed;
}
}
@@ -287,5 +284,4 @@ public class MultiVersionConcurrencyControl {
ClassSize.OBJECT +
2 * Bytes.SIZEOF_LONG +
2 * ClassSize.REFERENCE);
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 1479668..7e3465f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -62,7 +61,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -112,7 +110,7 @@ import com.lmax.disruptor.dsl.ProducerType;
*
* <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem,
* org.apache.hadoop.fs.Path)}.
- *
+ *
* <h2>Failure Semantic</h2>
* If an exception on append or sync, roll the WAL because the current WAL is now a lame duck;
* any more appends or syncs will fail also with the same original exception. If we have made
@@ -142,7 +140,7 @@ public class FSHLog implements WAL {
// Calls to append now also wait until the append has been done on the consumer side of the
// disruptor. We used to not wait but it makes the implemenation easier to grok if we have
// the region edit/sequence id after the append returns.
- //
+ //
// TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend
// once only? Probably hard given syncs take way longer than an append.
//
@@ -233,7 +231,7 @@ public class FSHLog implements WAL {
private final String logFilePrefix;
/**
- * Suffix included on generated wal file names
+ * Suffix included on generated wal file names
*/
private final String logFileSuffix;
@@ -250,13 +248,14 @@ public class FSHLog implements WAL {
protected final Configuration conf;
/** Listeners that are called on WAL events. */
- private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
+ private final List<WALActionsListener> listeners =
+ new CopyOnWriteArrayList<WALActionsListener>();
@Override
public void registerWALActionsListener(final WALActionsListener listener) {
this.listeners.add(listener);
}
-
+
@Override
public boolean unregisterWALActionsListener(final WALActionsListener listener) {
return this.listeners.remove(listener);
@@ -611,7 +610,7 @@ public class FSHLog implements WAL {
/**
* Tell listeners about pre log roll.
- * @throws IOException
+ * @throws IOException
*/
private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
throws IOException {
@@ -624,7 +623,7 @@ public class FSHLog implements WAL {
/**
* Tell listeners about post log roll.
- * @throws IOException
+ * @throws IOException
*/
private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
throws IOException {
@@ -1052,27 +1051,11 @@ public class FSHLog implements WAL {
}
}
- /**
- * @param now
- * @param encodedRegionName Encoded name of the region as returned by
- * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
- * @param tableName
- * @param clusterIds that have consumed the change
- * @return New log key.
- */
- @SuppressWarnings("deprecation")
- protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
- long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
- }
-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
justification="Will never be null")
@Override
public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
- final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
- final List<Cell> memstoreCells) throws IOException {
+ final WALEdit edits, final boolean inMemstore) throws IOException {
if (this.closed) throw new IOException("Cannot append; log is closed");
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
@@ -1086,9 +1069,9 @@ public class FSHLog implements WAL {
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
- // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the
- // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
- entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells);
+ // edit with its edit/sequence id.
+ // TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
+ entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
truck.loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
@@ -1115,9 +1098,9 @@ public class FSHLog implements WAL {
private volatile long sequence;
// Keep around last exception thrown. Clear on successful sync.
private final BlockingQueue<SyncFuture> syncFutures;
-
+
/**
- * UPDATE!
+ * UPDATE!
* @param syncs the batch of calls to sync that arrived as this thread was starting; when done,
* we will put the result of the actual hdfs sync call as the result.
* @param sequence The sequence number on the ring buffer when this thread was set running.
@@ -1165,7 +1148,7 @@ public class FSHLog implements WAL {
// This function releases one sync future only.
return 1;
}
-
+
/**
* Release all SyncFutures whose sequence is <= <code>currentSequence</code>.
* @param currentSequence
@@ -1569,7 +1552,7 @@ public class FSHLog implements WAL {
* 'safe point' while the orchestrating thread does some work that requires the first thread
* paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another
* thread.
- *
+ *
* <p>Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until
* Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A.
* Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused,
@@ -1577,7 +1560,7 @@ public class FSHLog implements WAL {
* it flags B and then Thread A and Thread B continue along on their merry way. Pause and
* signalling 'zigzags' between the two participating threads. We use two latches -- one the
* inverse of the other -- pausing and signaling when states are achieved.
- *
+ *
* <p>To start up the drama, Thread A creates an instance of this class each time it would do
* this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot
* only). Thread B notices the new instance (via reading a volatile reference or how ever) and it
@@ -1599,7 +1582,7 @@ public class FSHLog implements WAL {
* Latch to wait on. Will be released when we can proceed.
*/
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
-
+
/**
* For Thread A to call when it is ready to wait on the 'safe point' to be attained.
* Thread A will be held in here until Thread B calls {@link #safePointAttained()}
@@ -1608,7 +1591,7 @@ public class FSHLog implements WAL {
* @throws InterruptedException
* @throws ExecutionException
* @return The passed <code>syncFuture</code>
- * @throws FailedSyncBeforeLogCloseException
+ * @throws FailedSyncBeforeLogCloseException
*/
SyncFuture waitSafePoint(final SyncFuture syncFuture)
throws InterruptedException, FailedSyncBeforeLogCloseException {
@@ -1620,7 +1603,7 @@ public class FSHLog implements WAL {
}
return syncFuture;
}
-
+
/**
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals
* Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
@@ -1858,9 +1841,8 @@ public class FSHLog implements WAL {
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
// so region sequenceids will also be in order.
regionSequenceId = entry.stampRegionSequenceId();
-
- // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
- // region sequence id only, a region edit/sequence id that is not associated with an actual
+ // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
+ // region sequence id only, a region edit/sequence id that is not associated with an actual
// edit. It has to go through all the rigmarole to be sure we have the right ordering.
if (entry.getEdit().isEmpty()) {
return;
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index a768660..7f3eb61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
@@ -51,23 +50,18 @@ class FSWALEntry extends Entry {
// The below data members are denoted 'transient' just to highlight these are not persisted;
// they are only in memory and held here while passing over the ring buffer.
private final transient long sequence;
- private final transient AtomicLong regionSequenceIdReference;
private final transient boolean inMemstore;
private final transient HTableDescriptor htd;
private final transient HRegionInfo hri;
- private final transient List<Cell> memstoreCells;
private final Set<byte[]> familyNames;
FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
- final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
- final HTableDescriptor htd, final HRegionInfo hri, List<Cell> memstoreCells) {
+ final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) {
super(key, edit);
- this.regionSequenceIdReference = referenceToRegionSequenceId;
this.inMemstore = inMemstore;
this.htd = htd;
this.hri = hri;
this.sequence = sequence;
- this.memstoreCells = memstoreCells;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
ArrayList<Cell> cells = this.getEdit().getCells();
@@ -111,24 +105,30 @@ class FSWALEntry extends Entry {
}
/**
- * Stamp this edit with a region edit/sequence id.
- * Call when safe to do so: i.e. the context is such that the increment on the passed in
- * {@link #regionSequenceIdReference} is guaranteed aligned w/ how appends are going into the
- * WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this
- * method to be called.
- * @return The region edit/sequence id we set for this edit.
+ * Here is where a WAL edit gets its sequenceid.
+ * @return The sequenceid we stamped on this edit.
* @throws IOException
- * @see #getRegionSequenceId()
*/
long stampRegionSequenceId() throws IOException {
- long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
- if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) {
- for (Cell cell : this.memstoreCells) {
- CellUtil.setSequenceId(cell, regionSequenceId);
+ long regionSequenceId = WALKey.NO_SEQUENCE_ID;
+ MultiVersionConcurrencyControl mvcc = getKey().getMvcc();
+ MultiVersionConcurrencyControl.WriteEntry we = null;
+
+ if (mvcc != null) {
+ we = mvcc.begin();
+ regionSequenceId = we.getWriteNumber();
+ }
+
+ if (!this.getEdit().isReplay() && inMemstore) {
+ for (Cell c:getEdit().getCells()) {
+ CellUtil.setSequenceId(c, regionSequenceId);
}
}
+
+ // This has to stay in this order
WALKey key = getKey();
key.setLogSeqNum(regionSequenceId);
+ key.setWriteEntry(we);
return regionSequenceId;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 5218981..28141a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.Writable;
@@ -69,10 +70,18 @@ public class HLogKey extends WALKey implements Writable {
super(encodedRegionName, tablename);
}
+ @VisibleForTesting
public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
super(encodedRegionName, tablename, now);
}
+ public HLogKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ final long now,
+ final MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, now, mvcc);
+ }
+
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
@@ -86,9 +95,16 @@ public class HLogKey extends WALKey implements Writable {
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
*/
- public HLogKey(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ public HLogKey(
+ final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List<UUID> clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -104,9 +120,14 @@ public class HLogKey extends WALKey implements Writable {
* @param nonceGroup
* @param nonce
*/
- public HLogKey(final byte [] encodedRegionName, final TableName tablename,
- final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+ public HLogKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ final long now,
+ List<UUID> clusterIds,
+ long nonceGroup,
+ long nonce,
+ final MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -122,8 +143,8 @@ public class HLogKey extends WALKey implements Writable {
* @param nonce
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
- long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce);
+ long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce, mvcc);
}
/**
@@ -141,7 +162,8 @@ public class HLogKey extends WALKey implements Writable {
Compressor.writeCompressed(this.encodedRegionName, 0,
this.encodedRegionName.length, out,
compressionContext.regionDict);
- Compressor.writeCompressed(this.tablename.getName(), 0, this.tablename.getName().length, out,
+ Compressor.writeCompressed(this.tablename.getName(), 0,
+ this.tablename.getName().length, out,
compressionContext.tableDict);
}
out.writeLong(this.logSeqNum);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
index cb89346..f7ae208 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
/**
* An HLogKey specific to WalEdits coming from replay.
@@ -32,13 +33,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
public class ReplayHLogKey extends HLogKey {
public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
- final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+ final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+ MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc);
}
public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+ MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
index be39873..e41e1c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
@@ -37,7 +37,8 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;
-@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
+ HBaseInterfaceAudience.CONFIG})
public class SequenceFileLogReader extends ReaderBase {
private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
@@ -273,8 +274,10 @@ public class SequenceFileLogReader extends ReaderBase {
end = fEnd.getLong(this.reader);
} catch(NoSuchFieldException nfe) {
/* reflection failure, keep going */
+ if (LOG.isTraceEnabled()) LOG.trace(nfe);
} catch(IllegalAccessException iae) {
/* reflection failure, keep going */
+ if (LOG.isTraceEnabled()) LOG.trace(iae);
} catch(Exception e) {
/* All other cases. Should we handle it more aggressively? */
LOG.warn("Unexpected exception when accessing the end field", e);
@@ -293,8 +296,10 @@ public class SequenceFileLogReader extends ReaderBase {
.initCause(ioe);
} catch(NoSuchMethodException nfe) {
/* reflection failure, keep going */
+ if (LOG.isTraceEnabled()) LOG.trace(nfe);
} catch(IllegalAccessException iae) {
/* reflection failure, keep going */
+ if (LOG.isTraceEnabled()) LOG.trace(iae);
} catch(Exception e) {
/* All other cases. Should we handle it more aggressively? */
LOG.warn("Unexpected exception when accessing the end field", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index d2119d7..5e53e41 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -73,7 +73,7 @@ import com.google.common.annotations.VisibleForTesting;
* where, the WALEdit is serialized as:
* <-1, # of edits, <KeyValue>, <KeyValue>, ... >
* For example:
- * <-1, 3, <Keyvalue-for-edit-c1>, <KeyValue-for-edit-c2>, <KeyValue-for-edit-c3>>
+ * <-1, 3, <KV-for-edit-c1>, <KV-for-edit-c2>, <KV-for-edit-c3>>
*
* The -1 marker is just a special way of being backward compatible with
* an old WAL which would have contained a single <KeyValue>.
@@ -104,6 +104,9 @@ public class WALEdit implements Writable, HeapSize {
public static final WALEdit EMPTY_WALEDIT = new WALEdit();
// Only here for legacy writable deserialization
+ /**
+ * @deprecated Legacy
+ */
@Deprecated
private NavigableMap<byte[], Integer> scopes;
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 399623f..c89a466 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -20,20 +20,17 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -48,36 +45,34 @@ import com.google.protobuf.TextFormat;
public class WALUtil {
private static final Log LOG = LogFactory.getLog(WALUtil.class);
+ private WALUtil() {
+ // Shut down construction of this class.
+ }
+
/**
* Write the marker that a compaction has succeeded and is about to be committed.
* This provides info to the HMaster to allow it to recover the compaction if
* this regionserver dies in the middle (This part is not yet implemented). It also prevents
* the compaction from finishing if this regionserver has already lost its lease on the log.
- * @param sequenceId Used by WAL to get sequence Id for the waledit.
+ * @param mvcc Used by WAL to get sequence Id for the waledit.
*/
- public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
- final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
- TableName tn = TableName.valueOf(c.getTableName().toByteArray());
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
- log.sync();
+ public static long writeCompactionMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
+ final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc)
+ throws IOException {
+ long trx = writeMarker(wal, htd, hri, WALEdit.createCompaction(hri, c), mvcc, true);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
+ return trx;
}
/**
* Write a flush marker indicating a start / abort or a complete of a region flush
*/
- public static long writeFlushMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
- final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException {
- TableName tn = TableName.valueOf(f.getTableName().toByteArray());
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false,
- null);
- if (sync) log.sync(trx);
+ public static long writeFlushMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
+ final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
+ throws IOException {
+ long trx = writeMarker(wal, htd, hri, WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
@@ -87,14 +82,10 @@ public class WALUtil {
/**
* Write a region open marker indicating that the region is opened
*/
- public static long writeRegionEventMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
- final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException {
- TableName tn = TableName.valueOf(r.getTableName().toByteArray());
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r),
- sequenceId, false, null);
- log.sync(trx);
+ public static long writeRegionEventMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
+ final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
+ throws IOException {
+ long trx = writeMarker(wal, htd, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, true);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
@@ -106,35 +97,40 @@ public class WALUtil {
*
* @param wal The log to write into.
* @param htd A description of the table that we are bulk loading into.
- * @param info A description of the region in the table that we are bulk loading into.
- * @param descriptor A protocol buffers based description of the client's bulk loading request
- * @param sequenceId The current sequenceId in the log at the time when we were to write the
- * bulk load marker.
+ * @param hri A description of the region in the table that we are bulk loading into.
+ * @param desc A protocol buffers based description of the client's bulk loading request
* @return txid of this transaction or if nothing to do, the last txid
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
- public static long writeBulkLoadMarkerAndSync(final WAL wal,
- final HTableDescriptor htd,
- final HRegionInfo info,
- final WALProtos.BulkLoadDescriptor descriptor,
- final AtomicLong sequenceId) throws IOException {
- TableName tn = info.getTable();
- WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
+ public static long writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd,
+ final HRegionInfo hri, final WALProtos.BulkLoadDescriptor desc,
+ final MultiVersionConcurrencyControl mvcc)
+ throws IOException {
+ long trx = writeMarker(wal, htd, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, true);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
+ }
+ return trx;
+ }
+ private static long writeMarker(final WAL wal, final HTableDescriptor htd, final HRegionInfo hri,
+ final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync)
+ throws IOException {
+ // TODO: Pass in current time to use?
+ WALKey key =
+ new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc);
// Add it to the log but the false specifies that we don't need to add it to the memstore
- long trx = wal.append(htd,
- info,
- key,
- WALEdit.createBulkLoadEvent(info, descriptor),
- sequenceId,
- false,
- new ArrayList<Cell>());
- wal.sync(trx);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(descriptor));
+ long trx = MultiVersionConcurrencyControl.NONE;
+ try {
+ trx = wal.append(htd, hri, key, edit, false);
+ if (sync) wal.sync(trx);
+ } finally {
+ // If you get hung here, is it a real WAL or a mocked WAL? If the latter, you need to
+ // trip the latch that is inside in getWriteEntry up in your mock. See down in the append
+ // called from onEvent in FSHLog.
+ MultiVersionConcurrencyControl.WriteEntry we = key.getWriteEntry();
+ if (mvcc != null && we != null) mvcc.complete(we);
}
return trx;
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
index f628cee..84d6128 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
@@ -55,7 +55,7 @@ public class HashedBytes {
if (obj == null || getClass() != obj.getClass())
return false;
HashedBytes other = (HashedBytes) obj;
- return Arrays.equals(bytes, other.bytes);
+ return (hashCode == other.hashCode) && Arrays.equals(bytes, other.bytes);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 328793b..43738be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -155,7 +155,7 @@ class DisabledWALProvider implements WALProvider {
@Override
public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs) {
+ boolean inMemstore) {
if (!this.listeners.isEmpty()) {
final long start = System.nanoTime();
long len = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 4844487..d2b336e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -21,17 +21,13 @@ package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
-import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
// imports we use from yet-to-be-moved regionsever.wal
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
@@ -114,19 +110,16 @@ public interface WAL {
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
- * @param htd used to give scope for replication TODO refactor out in favor of table name and info
- * @param sequenceId A reference to the atomic long the <code>info</code> region is using as
- * source of its incrementing edits sequence id. Inside in this call we will increment it and
- * attach the sequence to the edit we apply the WAL.
+ * @param htd used to give scope for replication TODO refactor out in favor of table name and
+ * info
* @param inMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore.
- * @param memstoreKVs list of KVs added into memstore
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/
long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs)
+ boolean inMemstore)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 74284e0..05acd72 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -32,6 +32,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -68,13 +69,55 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
*
* Note that protected members marked @InterfaceAudience.Private are only protected
* to support the legacy HLogKey class, which is in a different package.
+ *
+ * <p>
*/
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
// purposes. They need to be merged into WALEntry.
+// TODO: Cleanup. We have logSeqNum and then WriteEntry, both are sequence id'ing. Fix.
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class WALKey implements SequenceId, Comparable<WALKey> {
private static final Log LOG = LogFactory.getLog(WALKey.class);
+ @InterfaceAudience.Private // For internal use only.
+ public MultiVersionConcurrencyControl getMvcc() {
+ return mvcc;
+ }
+
+ /**
+ * Will block until a write entry has been assigned by they WAL subsystem.
+ * @return A WriteEntry gotten from local WAL subsystem. Must be completed by calling
+ * mvcc#complete or mvcc#completeAndWait.
+ * @throws InterruptedIOException
+ * @see
+ * #setWriteEntry(org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry)
+ */
+ @InterfaceAudience.Private // For internal use only.
+ public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
+ try {
+ this.seqNumAssignedLatch.await();
+ } catch (InterruptedException ie) {
+ // If interrupted... clear out our entry else we can block up mvcc.
+ MultiVersionConcurrencyControl mvcc = getMvcc();
+ LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry);
+ if (mvcc != null) {
+ if (this.writeEntry != null) {
+ mvcc.complete(this.writeEntry);
+ }
+ }
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
+ }
+ return this.writeEntry;
+ }
+
+ @InterfaceAudience.Private // For internal use only.
+ public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
+ this.writeEntry = writeEntry;
+ this.seqNumAssignedLatch.countDown();
+ }
+
// should be < 0 (@see HLogKey#readFields(DataInput))
// version 2 supports WAL compression
// public members here are only public because of HLogKey
@@ -151,7 +194,9 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
- static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
+ private MultiVersionConcurrencyControl mvcc;
+ private MultiVersionConcurrencyControl.WriteEntry writeEntry;
+ public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
// visible for deprecated HLogKey
@InterfaceAudience.Private
@@ -159,16 +204,17 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
public WALKey() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
- new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+ new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null);
}
@VisibleForTesting
- public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+ public WALKey(final byte[] encodedRegionName, final TableName tablename,
+ long logSeqNum,
final long now, UUID clusterId) {
List<UUID> clusterIds = new ArrayList<UUID>();
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
- HConstants.NO_NONCE, HConstants.NO_NONCE);
+ HConstants.NO_NONCE, HConstants.NO_NONCE, null);
}
public WALKey(final byte[] encodedRegionName, final TableName tablename) {
@@ -176,8 +222,28 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
- init(encodedRegionName, tablename, NO_SEQUENCE_ID, now,
- EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ init(encodedRegionName,
+ tablename,
+ NO_SEQUENCE_ID,
+ now,
+ EMPTY_UUIDS,
+ HConstants.NO_NONCE,
+ HConstants.NO_NONCE,
+ null);
+ }
+
+ public WALKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ final long now,
+ MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName,
+ tablename,
+ NO_SEQUENCE_ID,
+ now,
+ EMPTY_UUIDS,
+ HConstants.NO_NONCE,
+ HConstants.NO_NONCE,
+ mvcc);
}
/**
@@ -187,15 +253,21 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
* <p>Used by log splitting and snapshots.
*
* @param encodedRegionName Encoded name of the region as returned by
- * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
- * @param tablename - name of table
- * @param logSeqNum - log sequence number
- * @param now Time at which this edit was written.
- * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename - name of table
+ * @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
*/
- public WALKey(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ public WALKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List<UUID> clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -204,17 +276,18 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
- * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename
- * @param now Time at which this edit was written.
- * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup
* @param nonce
+ * @param mvcc mvcc control used to generate sequence numbers and control read/write points
*/
- public WALKey(final byte [] encodedRegionName, final TableName tablename,
- final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds,
- nonceGroup, nonce);
+ public WALKey(final byte[] encodedRegionName, final TableName tablename,
+ final long now, List<UUID> clusterIds, long nonceGroup,
+ final long nonce, final MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -223,21 +296,37 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
- * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename
* @param logSeqNum
* @param nonceGroup
* @param nonce
*/
- public WALKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
- long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(),
- EMPTY_UUIDS, nonceGroup, nonce);
+ public WALKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ long nonceGroup,
+ long nonce,
+ final MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName,
+ tablename,
+ logSeqNum,
+ EnvironmentEdgeManager.currentTime(),
+ EMPTY_UUIDS,
+ nonceGroup,
+ nonce,
+ mvcc);
}
@InterfaceAudience.Private
- protected void init(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+ protected void init(final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List<UUID> clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc) {
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
@@ -245,6 +334,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
this.tablename = tablename;
this.nonceGroup = nonceGroup;
this.nonce = nonce;
+ this.mvcc = mvcc;
}
/**
@@ -270,15 +360,14 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
/**
- * Allow that the log sequence id to be set post-construction and release all waiters on assigned
- * sequence number.
+ * Allow that the log sequence id to be set post-construction
* Only public for org.apache.hadoop.hbase.regionserver.wal.FSWALEntry
* @param sequence
*/
@InterfaceAudience.Private
public void setLogSeqNum(final long sequence) {
this.logSeqNum = sequence;
- this.seqNumAssignedLatch.countDown();
+
}
/**
@@ -492,21 +581,22 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
this.encodedRegionName = encodedRegionName;
}
- public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
- throws IOException {
- org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
+ public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(
+ WALCellCodec.ByteStringCompressor compressor) throws IOException {
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder =
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
if (compressionContext == null) {
builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
} else {
builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
- compressionContext.regionDict));
+ compressionContext.regionDict));
builder.setTableName(compressor.compress(this.tablename.getName(),
- compressionContext.tableDict));
+ compressionContext.tableDict));
}
builder.setLogSequenceNumber(this.logSeqNum);
builder.setWriteTime(writeTime);
- if(this.origLogSeqNum > 0) {
+ if (this.origLogSeqNum > 0) {
builder.setOrigSequenceNumber(this.origLogSeqNum);
}
if (this.nonce != HConstants.NO_NONCE) {
@@ -532,8 +622,9 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
return builder;
}
- public void readFieldsFromPb(
- org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
+ public void readFieldsFromPb(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey,
+ WALCellCodec.ByteStringUncompressor uncompressor)
+ throws IOException {
if (this.compressionContext != null) {
this.encodedRegionName = uncompressor.uncompress(
walKey.getEncodedRegionName(), compressionContext.regionDict);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 7fed610..98882ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -2286,7 +2286,7 @@ public class WALSplitter {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
- clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce());
+ clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
logEntry.setFirst(key);
logEntry.setSecond(val);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
index d7a68e3..ea833dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
@@ -22,7 +22,7 @@ package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
@@ -90,7 +90,7 @@ public class TestFullLogReconstruction {
*/
@Test (timeout=300000)
public void testReconstruction() throws Exception {
- HTable table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
+ Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
// Load up the table with simple rows and count them
int initialCount = TEST_UTIL.loadTable(table, FAMILY);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 41e7ec5..22531c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,10 +46,10 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
-import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -224,8 +223,7 @@ public class TestIOFencing {
*/
@Test
public void testFencingAroundCompaction() throws Exception {
- doTest(BlockCompactionsInPrepRegion.class, false);
- doTest(BlockCompactionsInPrepRegion.class, true);
+ doTest(BlockCompactionsInPrepRegion.class);
}
/**
@@ -236,13 +234,11 @@ public class TestIOFencing {
*/
@Test
public void testFencingAroundCompactionAfterWALSync() throws Exception {
- doTest(BlockCompactionsInCompletionRegion.class, false);
- doTest(BlockCompactionsInCompletionRegion.class, true);
+ doTest(BlockCompactionsInCompletionRegion.class);
}
- public void doTest(Class<?> regionClass, boolean distributedLogReplay) throws Exception {
+ public void doTest(Class<?> regionClass) throws Exception {
Configuration c = TEST_UTIL.getConfiguration();
- c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
// Insert our custom region
c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
c.setBoolean("dfs.support.append", true);
@@ -283,7 +279,7 @@ public class TestIOFencing {
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
- oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
+ oldHri, compactionDescriptor, compactingRegion.getMVCC());
// Wait till flush has happened, otherwise there won't be multiple store files
long startWaitTime = System.currentTimeMillis();
@@ -354,4 +350,4 @@ public class TestIOFencing {
TEST_UTIL.shutdownMiniCluster();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 24f7190..f5e4026 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -383,7 +383,6 @@ public class TestReplicasClient {
}
}
-
@Test
public void testFlushTable() throws Exception {
openRegion(hriSecondary);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
index 53c234e..fb0d843 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
@@ -139,6 +139,4 @@ public class TestRegionObserverStacking extends TestCase {
assertTrue(idA < idB);
assertTrue(idB < idC);
}
-
-}
-
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index 19b45a7..95e77a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -29,7 +29,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -186,7 +186,6 @@ public class TestWALObserver {
Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
deleteDir(basedir);
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
- final AtomicLong sequenceId = new AtomicLong(0);
// TEST_FAMILY[0] shall be removed from WALEdit.
// TEST_FAMILY[1] value shall be changed.
@@ -235,7 +234,7 @@ public class TestWALObserver {
long now = EnvironmentEdgeManager.currentTime();
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
- edit, sequenceId, true, null);
+ edit, true);
log.sync(txid);
// the edit shall have been change now by the coprocessor.
@@ -271,7 +270,7 @@ public class TestWALObserver {
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
.toString(TEST_TABLE));
final HRegionInfo hri = new HRegionInfo(tableName, null, null);
- final AtomicLong sequenceId = new AtomicLong(0);
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName()));
@@ -298,7 +297,7 @@ public class TestWALObserver {
final int countPerFamily = 5;
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+ EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
}
LOG.debug("Verify that only the non-legacy CP saw edits.");
@@ -322,7 +321,7 @@ public class TestWALObserver {
final WALEdit edit = new WALEdit();
final byte[] nonce = Bytes.toBytes("1772");
edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
- final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null);
+ final long txid = wal.append(htd, hri, legacyKey, edit, true);
wal.sync(txid);
LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
@@ -347,7 +346,7 @@ public class TestWALObserver {
public void testEmptyWALEditAreNotSeen() throws Exception {
final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
- final AtomicLong sequenceId = new AtomicLong(0);
+ final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
WAL log = wals.getWAL(UNSPECIFIED_REGION, null);
try {
@@ -359,8 +358,9 @@ public class TestWALObserver {
assertFalse(cp.isPostWALWriteCalled());
final long now = EnvironmentEdgeManager.currentTime();
- long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
- new WALEdit(), sequenceId, true, null);
+ long txid = log.append(htd, hri,
+ new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc),
+ new WALEdit(), true);
log.sync(txid);
assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
@@ -379,7 +379,7 @@ public class TestWALObserver {
// ultimately called by HRegion::initialize()
TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
- final AtomicLong sequenceId = new AtomicLong(0);
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
// final HRegionInfo hri =
// createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
// final HRegionInfo hri1 =
@@ -403,10 +403,9 @@ public class TestWALObserver {
// for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+ EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
}
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
- true, null);
+ wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
// sync to fs.
wal.sync();
@@ -526,7 +525,7 @@ public class TestWALObserver {
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
- final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException {
+ final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
String familyStr = Bytes.toString(family);
long txid = -1;
for (int j = 0; j < count; j++) {
@@ -537,7 +536,7 @@ public class TestWALObserver {
// uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
// about legacy coprocessors
txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
- ee.currentTime()), edit, sequenceId, true, null);
+ ee.currentTime(), mvcc), edit, true);
}
if (-1 != txid) {
wal.sync(txid);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
index dd47325..0ceae46 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
@@ -31,8 +31,8 @@ import org.junit.experimental.categories.Category;
public class TestHLogRecordReader extends TestWALRecordReader {
@Override
- protected WALKey getWalKey(final long sequenceid) {
- return new HLogKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+ protected WALKey getWalKey(final long time) {
+ return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index 89f0e7a..2423d03 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -74,6 +75,7 @@ public class TestWALRecordReader {
private static final byte [] value = Bytes.toBytes("value");
private static HTableDescriptor htd;
private static Path logDir;
+ protected MultiVersionConcurrencyControl mvcc;
private static String getName() {
return "TestWALRecordReader";
@@ -81,6 +83,7 @@ public class TestWALRecordReader {
@Before
public void setUp() throws Exception {
+ mvcc = new MultiVersionConcurrencyControl();
FileStatus[] entries = fs.listStatus(hbaseDir);
for (FileStatus dir : entries) {
fs.delete(dir.getPath(), true);
@@ -123,13 +126,11 @@ public class TestWALRecordReader {
// being millisecond based.
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
- final AtomicLong sequenceId = new AtomicLong(0);
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
- log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null);
+ log.append(htd, info, getWalKey(ts), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
- log.append(htd, info, getWalKey(ts+1), edit, sequenceId,
- true, null);
+ log.append(htd, info, getWalKey(ts+1), edit, true);
log.sync();
LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter();
@@ -140,12 +141,10 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
- log.append(htd, info, getWalKey(ts1+1), edit, sequenceId,
- true, null);
+ log.append(htd, info, getWalKey(ts1+1), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
- log.append(htd, info, getWalKey(ts1+2), edit, sequenceId,
- true, null);
+ log.append(htd, info, getWalKey(ts1+2), edit, true);
log.sync();
log.shutdown();
walfactory.shutdown();
@@ -187,8 +186,7 @@ public class TestWALRecordReader {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
- long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
- null);
+ long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
log.sync(txid);
Thread.sleep(1); // make sure 2nd log gets a later timestamp
@@ -198,8 +196,7 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
- txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
- null);
+ txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
log.sync(txid);
log.shutdown();
walfactory.shutdown();
@@ -238,8 +235,8 @@ public class TestWALRecordReader {
testSplit(splits.get(1));
}
- protected WALKey getWalKey(final long sequenceid) {
- return new WALKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+ protected WALKey getWalKey(final long time) {
+ return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
}
protected WALRecordReader getReader() {