You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/09/23 02:23:21 UTC
[3/5] hbase git commit: HBASE-12751 Allow RowLock to be reader writer
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index 2d65387..d101f7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -18,239 +18,198 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
/**
- * Manages the read/write consistency within memstore. This provides
- * an interface for readers to determine what entries to ignore, and
- * a mechanism for writers to obtain new write numbers, then "commit"
+ * Manages the read/write consistency. This provides an interface for readers to determine what
+ * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit"
* the new writes for readers to read (thus forming atomic transactions).
*/
@InterfaceAudience.Private
public class MultiVersionConcurrencyControl {
- private static final long NO_WRITE_NUMBER = 0;
- private volatile long memstoreRead = 0;
+ final AtomicLong readPoint = new AtomicLong(0);
+ final AtomicLong writePoint = new AtomicLong(0);
private final Object readWaiters = new Object();
+ /**
+ * Represents no value, or not set.
+ */
+ private static final long NONE = -1;
// This is the pending queue of writes.
- private final LinkedList<WriteEntry> writeQueue =
- new LinkedList<WriteEntry>();
+ //
+ // TODO(eclark): Should this be an array of fixed size to
+ // reduce the number of allocations on the write path?
+ // This could be equal to the number of handlers + a small number.
+ // TODO: St.Ack 20150903 Sounds good to me.
+ private final LinkedList<WriteEntry> writeQueue = new LinkedList<WriteEntry>();
- /**
- * Default constructor. Initializes the memstoreRead/Write points to 0.
- */
public MultiVersionConcurrencyControl() {
+ super();
}
/**
- * Initializes the memstoreRead/Write points appropriately.
- * @param startPoint
+ * Construct and set read point. Write point is uninitialized.
*/
- public void initialize(long startPoint) {
- synchronized (writeQueue) {
- writeQueue.clear();
- memstoreRead = startPoint;
- }
+ public MultiVersionConcurrencyControl(long startPoint) {
+ tryAdvanceTo(startPoint, NONE);
}
/**
- *
- * @param initVal The value we used initially and expected it'll be reset later
- * @return WriteEntry instance.
+ * Step the MVCC forward on to a new read/write basis.
+ * @param newStartPoint
*/
- WriteEntry beginMemstoreInsert() {
- return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
+ public void advanceTo(long newStartPoint) {
+ while (true) {
+ long seqId = this.getWritePoint();
+ if (seqId >= newStartPoint) break;
+ if (this.tryAdvanceTo(/* newSeqId = */ newStartPoint, /* expected = */ seqId)) break;
+ }
}
/**
- * Get a mvcc write number before an actual one(its log sequence Id) being assigned
- * @param sequenceId
- * @return long a faked write number which is bigger enough not to be seen by others before a real
- * one is assigned
+ * Step the MVCC forward on to a new read/write basis.
+ * @param newStartPoint Point to move read and write points to.
+ * @param expected If not -1 (#NONE)
+ * @return Returns false if <code>expected</code> is not equal to the
+ * current <code>readPoint</code> or if <code>startPoint</code> is less than current
+ * <code>readPoint</code>
*/
- public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
- // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
- // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
- // because each handler could increment sequence num twice and max concurrent in-flight
- // transactions is the number of RPC handlers.
- // We can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
- // changes touch same row key.
- // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
- // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all.
- // St.Ack 20150901 Where is the reset to NO_WRITE_NUMBER done?
- return sequenceId.incrementAndGet() + 1000000000;
+ boolean tryAdvanceTo(long newStartPoint, long expected) {
+ synchronized (writeQueue) {
+ long currentRead = this.readPoint.get();
+ long currentWrite = this.writePoint.get();
+ if (currentRead != currentWrite) {
+ throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
+ ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
+ }
+ if (expected != NONE && expected != currentRead) {
+ return false;
+ }
+
+ if (newStartPoint < currentRead) {
+ return false;
+ }
+
+ readPoint.set(newStartPoint);
+ writePoint.set(newStartPoint);
+ }
+ return true;
}
/**
- * This function starts a MVCC transaction with current region's log change sequence number. Since
- * we set change sequence number when flushing current change to WAL(late binding), the flush
- * order may differ from the order to start a MVCC transaction. For example, a change begins a
- * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
- * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
- * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
- * big number is safe because we only need it to prevent current change being seen and the number
- * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
- * for MVCC to align with flush sequence.
- * @param curSeqNum
- * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
+ * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
+ * to our queue of ongoing writes. Return this WriteEntry instance.
+ * To complete the write transaction and wait for it to be visible, call
+ * {@link #completeAndWait(WriteEntry)}. If the write failed, call
+ * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write
+ * transaction.
+ * @see #complete(WriteEntry)
+ * @see #completeAndWait(WriteEntry)
*/
- public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
- WriteEntry e = new WriteEntry(curSeqNum);
+ public WriteEntry begin() {
synchronized (writeQueue) {
+ long nextWriteNumber = writePoint.incrementAndGet();
+ WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
return e;
}
}
/**
- * Complete a {@link WriteEntry} that was created by
- * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
- * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
- * visible to MVCC readers.
- * @throws IOException
- */
- public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
- throws IOException {
- if(e == null) return;
- if (seqId != null) {
- e.setWriteNumber(seqId.getSequenceId());
- } else {
- // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
- // function beginMemstoreInsertWithSeqNum in case of failures
- e.setWriteNumber(NO_WRITE_NUMBER);
- }
- waitForPreviousTransactionsComplete(e);
- }
-
- /**
- * Cancel a write insert that failed.
- * Removes the write entry without advancing read point or without interfering with write
- * entries queued behind us. It is like #advanceMemstore(WriteEntry) only this method
- * will move the read point to the sequence id that is in WriteEntry even if it ridiculous (see
- * the trick in HRegion where we call {@link #getPreAssignedWriteNumber(AtomicLong)} just to mark
- * it as for special handling).
- * @param writeEntry Failed attempt at write. Does cleanup.
+ * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs
+ * to complete.
*/
- public void cancelMemstoreInsert(WriteEntry writeEntry) {
- // I'm not clear on how this voodoo all works but setting write number to -1 does NOT advance
- // readpoint and gets my little writeEntry completed and removed from queue of outstanding
- // events which seems right. St.Ack 20150901.
- writeEntry.setWriteNumber(NO_WRITE_NUMBER);
- advanceMemstore(writeEntry);
+ public void await() {
+ // Add a write and then wait on reads to catch up to it.
+ completeAndWait(begin());
}
/**
- * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
- * end of this call, the global read point is at least as large as the write point of the passed
- * in WriteEntry. Thus, the write is visible to MVCC readers.
+ * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the
+ * read point catches up to our write.
+ *
+ * At the end of this call, the global read point is at least as large as the write point
+ * of the passed in WriteEntry. Thus, the write is visible to MVCC readers.
*/
- public void completeMemstoreInsert(WriteEntry e) {
- waitForPreviousTransactionsComplete(e);
+ public void completeAndWait(WriteEntry e) {
+ complete(e);
+ waitForRead(e);
}
/**
- * Mark the {@link WriteEntry} as complete and advance the read point as
- * much as possible.
+ * Mark the {@link WriteEntry} as complete and advance the read point as much as possible.
+ * Call this even if the write has FAILED (AFTER backing out the write transaction
+ * changes completely) so we can clean up the outstanding transaction.
*
* How much is the read point advanced?
- * Let S be the set of all write numbers that are completed and where all previous write numbers
- * are also completed. Then, the read point is advanced to the supremum of S.
+ *
+ * Let S be the set of all write numbers that are completed. Set the read point to the highest
+ * numbered write of S.
+ *
+ * @param writeEntry
*
- * @param e
* @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
*/
- boolean advanceMemstore(WriteEntry e) {
- long nextReadValue = -1;
+ public boolean complete(WriteEntry writeEntry) {
synchronized (writeQueue) {
- e.markCompleted();
+ writeEntry.markCompleted();
+ long nextReadValue = NONE;
+ boolean ranOnce = false;
while (!writeQueue.isEmpty()) {
+ ranOnce = true;
WriteEntry queueFirst = writeQueue.getFirst();
+
+ if (nextReadValue > 0) {
+ if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
+ throw new RuntimeException("Invariant in complete violated, nextReadValue="
+ + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
+ }
+ }
+
if (queueFirst.isCompleted()) {
- // Using Max because Edit complete in WAL sync order not arriving order
- nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
+ nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
} else {
break;
}
}
- if (nextReadValue > memstoreRead) {
- memstoreRead = nextReadValue;
+ if (!ranOnce) {
+ throw new RuntimeException("There is no first!");
}
- // notify waiters on writeQueue before return
- writeQueue.notifyAll();
- }
-
- if (nextReadValue > 0) {
- synchronized (readWaiters) {
- readWaiters.notifyAll();
- }
- }
-
- if (memstoreRead >= e.getWriteNumber()) {
- return true;
- }
- return false;
- }
-
- /**
- * Advances the current read point to be given seqNum if it is smaller than
- * that.
- */
- void advanceMemstoreReadPointIfNeeded(long seqNum) {
- synchronized (writeQueue) {
- if (this.memstoreRead < seqNum) {
- memstoreRead = seqNum;
+ if (nextReadValue > 0) {
+ synchronized (readWaiters) {
+ readPoint.set(nextReadValue);
+ readWaiters.notifyAll();
+ }
}
+ return readPoint.get() >= writeEntry.getWriteNumber();
}
}
/**
- * Wait for all previous MVCC transactions complete
+ * Wait for the global readPoint to advance up to the passed in write entry number.
*/
- public void waitForPreviousTransactionsComplete() {
- WriteEntry w = beginMemstoreInsert();
- waitForPreviousTransactionsComplete(w);
- }
-
- public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
+ void waitForRead(WriteEntry e) {
boolean interrupted = false;
- WriteEntry w = waitedEntry;
-
- try {
- WriteEntry firstEntry = null;
- do {
- synchronized (writeQueue) {
- // writeQueue won't be empty at this point, the following is just a safety check
- if (writeQueue.isEmpty()) {
- break;
- }
- firstEntry = writeQueue.getFirst();
- if (firstEntry == w) {
- // all previous in-flight transactions are done
- break;
- }
- try {
- writeQueue.wait(0);
- } catch (InterruptedException ie) {
- // We were interrupted... finish the loop -- i.e. cleanup --and then
- // on our way out, reset the interrupt flag.
- interrupted = true;
- break;
- }
+ synchronized (readWaiters) {
+ while (readPoint.get() < e.getWriteNumber()) {
+ try {
+ readWaiters.wait(0);
+ } catch (InterruptedException ie) {
+ // We were interrupted... finish the loop -- i.e. cleanup --and then
+ // on our way out, reset the interrupt flag.
+ interrupted = true;
}
- } while (firstEntry != null);
- } finally {
- if (w != null) {
- advanceMemstore(w);
}
}
if (interrupted) {
@@ -258,34 +217,43 @@ public class MultiVersionConcurrencyControl {
}
}
- public long memstoreReadPoint() {
- return memstoreRead;
+ public long getReadPoint() {
+ return readPoint.get();
}
+ @VisibleForTesting
+ public long getWritePoint() {
+ return writePoint.get();
+ }
+
+ /**
+ * Write number and whether write has completed given out at start of a write transaction.
+ * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
+ */
+ @InterfaceAudience.Private
public static class WriteEntry {
- private long writeNumber;
- private volatile boolean completed = false;
+ private final long writeNumber;
+ private boolean completed = false;
WriteEntry(long writeNumber) {
this.writeNumber = writeNumber;
}
+
void markCompleted() {
this.completed = true;
}
+
boolean isCompleted() {
return this.completed;
}
- long getWriteNumber() {
+
+ public long getWriteNumber() {
return this.writeNumber;
}
- void setWriteNumber(long val){
- this.writeNumber = val;
- }
}
public static final long FIXED_SIZE = ClassSize.align(
ClassSize.OBJECT +
2 * Bytes.SIZEOF_LONG +
2 * ClassSize.REFERENCE);
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 1c29827..0e4a585 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -63,7 +62,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -111,7 +109,7 @@ import com.lmax.disruptor.dsl.ProducerType;
*
* <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem,
* org.apache.hadoop.fs.Path)}.
- *
+ *
* <h2>Failure Semantic</h2>
* If an exception on append or sync, roll the WAL because the current WAL is now a lame duck;
* any more appends or syncs will fail also with the same original exception. If we have made
@@ -141,7 +139,7 @@ public class FSHLog implements WAL {
// Calls to append now also wait until the append has been done on the consumer side of the
// disruptor. We used to not wait but it makes the implemenation easier to grok if we have
// the region edit/sequence id after the append returns.
- //
+ //
// TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend
// once only? Probably hard given syncs take way longer than an append.
//
@@ -232,7 +230,7 @@ public class FSHLog implements WAL {
private final String logFilePrefix;
/**
- * Suffix included on generated wal file names
+ * Suffix included on generated wal file names
*/
private final String logFileSuffix;
@@ -249,13 +247,14 @@ public class FSHLog implements WAL {
protected final Configuration conf;
/** Listeners that are called on WAL events. */
- private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
+ private final List<WALActionsListener> listeners =
+ new CopyOnWriteArrayList<WALActionsListener>();
@Override
public void registerWALActionsListener(final WALActionsListener listener) {
this.listeners.add(listener);
}
-
+
@Override
public boolean unregisterWALActionsListener(final WALActionsListener listener) {
return this.listeners.remove(listener);
@@ -618,7 +617,7 @@ public class FSHLog implements WAL {
/**
* Tell listeners about pre log roll.
- * @throws IOException
+ * @throws IOException
*/
private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
throws IOException {
@@ -631,7 +630,7 @@ public class FSHLog implements WAL {
/**
* Tell listeners about post log roll.
- * @throws IOException
+ * @throws IOException
*/
private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
throws IOException {
@@ -1059,27 +1058,11 @@ public class FSHLog implements WAL {
}
}
- /**
- * @param now
- * @param encodedRegionName Encoded name of the region as returned by
- * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
- * @param tableName
- * @param clusterIds that have consumed the change
- * @return New log key.
- */
- @SuppressWarnings("deprecation")
- protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
- long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
- }
-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
justification="Will never be null")
@Override
public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
- final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
- final List<Cell> memstoreCells) throws IOException {
+ final WALEdit edits, final boolean inMemstore) throws IOException {
if (this.closed) throw new IOException("Cannot append; log is closed");
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
@@ -1093,9 +1076,9 @@ public class FSHLog implements WAL {
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
- // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the
- // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
- entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells);
+ // edit with its edit/sequence id.
+ // TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
+ entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
truck.loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
@@ -1122,9 +1105,9 @@ public class FSHLog implements WAL {
private volatile long sequence;
// Keep around last exception thrown. Clear on successful sync.
private final BlockingQueue<SyncFuture> syncFutures;
-
+
/**
- * UPDATE!
+ * UPDATE!
* @param syncs the batch of calls to sync that arrived as this thread was starting; when done,
* we will put the result of the actual hdfs sync call as the result.
* @param sequence The sequence number on the ring buffer when this thread was set running.
@@ -1172,7 +1155,7 @@ public class FSHLog implements WAL {
// This function releases one sync future only.
return 1;
}
-
+
/**
* Release all SyncFutures whose sequence is <= <code>currentSequence</code>.
* @param currentSequence
@@ -1604,7 +1587,7 @@ public class FSHLog implements WAL {
* 'safe point' while the orchestrating thread does some work that requires the first thread
* paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another
* thread.
- *
+ *
* <p>Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until
* Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A.
* Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused,
@@ -1612,7 +1595,7 @@ public class FSHLog implements WAL {
* it flags B and then Thread A and Thread B continue along on their merry way. Pause and
* signalling 'zigzags' between the two participating threads. We use two latches -- one the
* inverse of the other -- pausing and signaling when states are achieved.
- *
+ *
* <p>To start up the drama, Thread A creates an instance of this class each time it would do
* this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot
* only). Thread B notices the new instance (via reading a volatile reference or how ever) and it
@@ -1634,7 +1617,7 @@ public class FSHLog implements WAL {
* Latch to wait on. Will be released when we can proceed.
*/
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
-
+
/**
* For Thread A to call when it is ready to wait on the 'safe point' to be attained.
* Thread A will be held in here until Thread B calls {@link #safePointAttained()}
@@ -1643,7 +1626,7 @@ public class FSHLog implements WAL {
* @throws InterruptedException
* @throws ExecutionException
* @return The passed <code>syncFuture</code>
- * @throws FailedSyncBeforeLogCloseException
+ * @throws FailedSyncBeforeLogCloseException
*/
SyncFuture waitSafePoint(final SyncFuture syncFuture)
throws InterruptedException, FailedSyncBeforeLogCloseException {
@@ -1655,7 +1638,7 @@ public class FSHLog implements WAL {
}
return syncFuture;
}
-
+
/**
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals
* Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
@@ -1893,9 +1876,8 @@ public class FSHLog implements WAL {
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
// so region sequenceids will also be in order.
regionSequenceId = entry.stampRegionSequenceId();
-
- // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
- // region sequence id only, a region edit/sequence id that is not associated with an actual
+ // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
+ // region sequence id only, a region edit/sequence id that is not associated with an actual
// edit. It has to go through all the rigmarole to be sure we have the right ordering.
if (entry.getEdit().isEmpty()) {
return;
@@ -2053,4 +2035,4 @@ public class FSHLog implements WAL {
}
return new DatanodeInfo[0];
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index a768660..7f3eb61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
@@ -51,23 +50,18 @@ class FSWALEntry extends Entry {
// The below data members are denoted 'transient' just to highlight these are not persisted;
// they are only in memory and held here while passing over the ring buffer.
private final transient long sequence;
- private final transient AtomicLong regionSequenceIdReference;
private final transient boolean inMemstore;
private final transient HTableDescriptor htd;
private final transient HRegionInfo hri;
- private final transient List<Cell> memstoreCells;
private final Set<byte[]> familyNames;
FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
- final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
- final HTableDescriptor htd, final HRegionInfo hri, List<Cell> memstoreCells) {
+ final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) {
super(key, edit);
- this.regionSequenceIdReference = referenceToRegionSequenceId;
this.inMemstore = inMemstore;
this.htd = htd;
this.hri = hri;
this.sequence = sequence;
- this.memstoreCells = memstoreCells;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
ArrayList<Cell> cells = this.getEdit().getCells();
@@ -111,24 +105,30 @@ class FSWALEntry extends Entry {
}
/**
- * Stamp this edit with a region edit/sequence id.
- * Call when safe to do so: i.e. the context is such that the increment on the passed in
- * {@link #regionSequenceIdReference} is guaranteed aligned w/ how appends are going into the
- * WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this
- * method to be called.
- * @return The region edit/sequence id we set for this edit.
+ * Here is where a WAL edit gets its sequenceid.
+ * @return The sequenceid we stamped on this edit.
* @throws IOException
- * @see #getRegionSequenceId()
*/
long stampRegionSequenceId() throws IOException {
- long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
- if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) {
- for (Cell cell : this.memstoreCells) {
- CellUtil.setSequenceId(cell, regionSequenceId);
+ long regionSequenceId = WALKey.NO_SEQUENCE_ID;
+ MultiVersionConcurrencyControl mvcc = getKey().getMvcc();
+ MultiVersionConcurrencyControl.WriteEntry we = null;
+
+ if (mvcc != null) {
+ we = mvcc.begin();
+ regionSequenceId = we.getWriteNumber();
+ }
+
+ if (!this.getEdit().isReplay() && inMemstore) {
+ for (Cell c:getEdit().getCells()) {
+ CellUtil.setSequenceId(c, regionSequenceId);
}
}
+
+ // This has to stay in this order
WALKey key = getKey();
key.setLogSeqNum(regionSequenceId);
+ key.setWriteEntry(we);
return regionSequenceId;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 5218981..1302d8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.Writable;
@@ -73,6 +74,13 @@ public class HLogKey extends WALKey implements Writable {
super(encodedRegionName, tablename, now);
}
+ public HLogKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ final long now,
+ final MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, now, mvcc);
+ }
+
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
@@ -86,9 +94,16 @@ public class HLogKey extends WALKey implements Writable {
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
*/
- public HLogKey(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ public HLogKey(
+ final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List<UUID> clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -104,9 +119,14 @@ public class HLogKey extends WALKey implements Writable {
* @param nonceGroup
* @param nonce
*/
- public HLogKey(final byte [] encodedRegionName, final TableName tablename,
- final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+ public HLogKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ final long now,
+ List<UUID> clusterIds,
+ long nonceGroup,
+ long nonce,
+ final MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -122,8 +142,8 @@ public class HLogKey extends WALKey implements Writable {
* @param nonce
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
- long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce);
+ long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce, mvcc);
}
/**
@@ -141,7 +161,8 @@ public class HLogKey extends WALKey implements Writable {
Compressor.writeCompressed(this.encodedRegionName, 0,
this.encodedRegionName.length, out,
compressionContext.regionDict);
- Compressor.writeCompressed(this.tablename.getName(), 0, this.tablename.getName().length, out,
+ Compressor.writeCompressed(this.tablename.getName(), 0,
+ this.tablename.getName().length, out,
compressionContext.tableDict);
}
out.writeLong(this.logSeqNum);
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
index cb89346..f7ae208 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
/**
* An HLogKey specific to WalEdits coming from replay.
@@ -32,13 +33,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
public class ReplayHLogKey extends HLogKey {
public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
- final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+ final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+ MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc);
}
public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+ MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
index be39873..e41e1c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
@@ -37,7 +37,8 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;
-@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
+ HBaseInterfaceAudience.CONFIG})
public class SequenceFileLogReader extends ReaderBase {
private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
@@ -273,8 +274,10 @@ public class SequenceFileLogReader extends ReaderBase {
end = fEnd.getLong(this.reader);
} catch(NoSuchFieldException nfe) {
/* reflection failure, keep going */
+ if (LOG.isTraceEnabled()) LOG.trace(nfe);
} catch(IllegalAccessException iae) {
/* reflection failure, keep going */
+ if (LOG.isTraceEnabled()) LOG.trace(iae);
} catch(Exception e) {
/* All other cases. Should we handle it more aggressively? */
LOG.warn("Unexpected exception when accessing the end field", e);
@@ -293,8 +296,10 @@ public class SequenceFileLogReader extends ReaderBase {
.initCause(ioe);
} catch(NoSuchMethodException nfe) {
/* reflection failure, keep going */
+ if (LOG.isTraceEnabled()) LOG.trace(nfe);
} catch(IllegalAccessException iae) {
/* reflection failure, keep going */
+ if (LOG.isTraceEnabled()) LOG.trace(iae);
} catch(Exception e) {
/* All other cases. Should we handle it more aggressively? */
LOG.warn("Unexpected exception when accessing the end field", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index a752ff1..3b774ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -73,7 +73,7 @@ import com.google.common.annotations.VisibleForTesting;
* where, the WALEdit is serialized as:
* <-1, # of edits, <KeyValue>, <KeyValue>, ... >
* For example:
- * <-1, 3, <Keyvalue-for-edit-c1>, <KeyValue-for-edit-c2>, <KeyValue-for-edit-c3>>
+ * <-1, 3, <KV-for-edit-c1>, <KV-for-edit-c2>, <KV-for-edit-c3>>
*
* The -1 marker is just a special way of being backward compatible with
* an old WAL which would have contained a single <KeyValue>.
@@ -104,6 +104,9 @@ public class WALEdit implements Writable, HeapSize {
public static final WALEdit EMPTY_WALEDIT = new WALEdit();
// Only here for legacy writable deserialization
+ /**
+ * @deprecated Legacy
+ */
@Deprecated
private NavigableMap<byte[], Integer> scopes;
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 399623f..2718295 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -20,12 +20,9 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -34,6 +31,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -48,19 +46,27 @@ import com.google.protobuf.TextFormat;
public class WALUtil {
private static final Log LOG = LogFactory.getLog(WALUtil.class);
+ private WALUtil() {
+ // Shut down construction of this class.
+ }
+
/**
* Write the marker that a compaction has succeeded and is about to be committed.
* This provides info to the HMaster to allow it to recover the compaction if
* this regionserver dies in the middle (This part is not yet implemented). It also prevents
* the compaction from finishing if this regionserver has already lost its lease on the log.
- * @param sequenceId Used by WAL to get sequence Id for the waledit.
+ * @param mvcc Used by WAL to get sequence Id for the waledit.
*/
- public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
- final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
+ public static void writeCompactionMarker(WAL log,
+ HTableDescriptor htd,
+ HRegionInfo info,
+ final CompactionDescriptor c,
+ MultiVersionConcurrencyControl mvcc) throws IOException {
TableName tn = TableName.valueOf(c.getTableName().toByteArray());
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
+ WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn, System.currentTimeMillis(), mvcc);
+ log.append(htd, info, key, WALEdit.createCompaction(info, c), false);
+ mvcc.complete(key.getWriteEntry());
log.sync();
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
@@ -70,13 +76,17 @@ public class WALUtil {
/**
* Write a flush marker indicating a start / abort or a complete of a region flush
*/
- public static long writeFlushMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
- final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException {
+ public static long writeFlushMarker(WAL log,
+ HTableDescriptor htd,
+ HRegionInfo info,
+ final FlushDescriptor f,
+ boolean sync,
+ MultiVersionConcurrencyControl mvcc) throws IOException {
TableName tn = TableName.valueOf(f.getTableName().toByteArray());
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false,
- null);
+ WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn, System.currentTimeMillis(), mvcc);
+ long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), false);
+ mvcc.complete(key.getWriteEntry());
if (sync) log.sync(trx);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
@@ -88,12 +98,11 @@ public class WALUtil {
* Write a region open marker indicating that the region is opened
*/
public static long writeRegionEventMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
- final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException {
+ final RegionEventDescriptor r) throws IOException {
TableName tn = TableName.valueOf(r.getTableName().toByteArray());
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r),
- sequenceId, false, null);
+ long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r), false);
log.sync(trx);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
@@ -108,27 +117,22 @@ public class WALUtil {
* @param htd A description of the table that we are bulk loading into.
* @param info A description of the region in the table that we are bulk loading into.
* @param descriptor A protocol buffers based description of the client's bulk loading request
- * @param sequenceId The current sequenceId in the log at the time when we were to write the
- * bulk load marker.
* @return txid of this transaction or if nothing to do, the last txid
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
public static long writeBulkLoadMarkerAndSync(final WAL wal,
final HTableDescriptor htd,
final HRegionInfo info,
- final WALProtos.BulkLoadDescriptor descriptor,
- final AtomicLong sequenceId) throws IOException {
+ final WALProtos.BulkLoadDescriptor descriptor)
+ throws IOException {
TableName tn = info.getTable();
WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
// Add it to the log but the false specifies that we don't need to add it to the memstore
long trx = wal.append(htd,
- info,
- key,
- WALEdit.createBulkLoadEvent(info, descriptor),
- sequenceId,
- false,
- new ArrayList<Cell>());
+ info,
+ key,
+ WALEdit.createBulkLoadEvent(info, descriptor), false);
wal.sync(trx);
if (LOG.isTraceEnabled()) {
@@ -136,5 +140,4 @@ public class WALUtil {
}
return trx;
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
index f628cee..84d6128 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
@@ -55,7 +55,7 @@ public class HashedBytes {
if (obj == null || getClass() != obj.getClass())
return false;
HashedBytes other = (HashedBytes) obj;
- return Arrays.equals(bytes, other.bytes);
+ return (hashCode == other.hashCode) && Arrays.equals(bytes, other.bytes);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 04045ec..191d546 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -156,7 +156,7 @@ class DisabledWALProvider implements WALProvider {
@Override
public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs) {
+ boolean inMemstore) {
if (!this.listeners.isEmpty()) {
final long start = System.nanoTime();
long len = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 4844487..ce34c98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -114,19 +114,16 @@ public interface WAL {
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
- * @param htd used to give scope for replication TODO refactor out in favor of table name and info
- * @param sequenceId A reference to the atomic long the <code>info</code> region is using as
- * source of its incrementing edits sequence id. Inside in this call we will increment it and
- * attach the sequence to the edit we apply the WAL.
+ * @param htd used to give scope for replication TODO refactor out in favor of table name and
+ * info
* @param inMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore.
- * @param memstoreKVs list of KVs added into memstore
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/
long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs)
+ boolean inMemstore)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 74284e0..48ede4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -32,6 +32,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -68,13 +69,47 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
*
* Note that protected members marked @InterfaceAudience.Private are only protected
* to support the legacy HLogKey class, which is in a different package.
+ *
+ * <p>
*/
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
// purposes. They need to be merged into WALEntry.
+// TODO: Cleanup. We have logSeqNum and then WriteEntry, both are sequence id'ing. Fix.
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class WALKey implements SequenceId, Comparable<WALKey> {
private static final Log LOG = LogFactory.getLog(WALKey.class);
+ @InterfaceAudience.Private // For internal use only.
+ public MultiVersionConcurrencyControl getMvcc() {
+ return mvcc;
+ }
+
+ /**
+ * Will block until a write entry has been assigned by they WAL subsystem.
+ * @return A WriteEntry gotten from local WAL subsystem. Must be completed by calling
+ * mvcc#complete or mvcc#completeAndWait.
+ * @throws InterruptedIOException
+ * @see
+ * #setWriteEntry(org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry)
+ */
+ @InterfaceAudience.Private // For internal use only.
+ public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
+ try {
+ this.seqNumAssignedLatch.await();
+ } catch (InterruptedException ie) {
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
+ }
+ return writeEntry;
+ }
+
+ @InterfaceAudience.Private // For internal use only.
+ public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
+ this.writeEntry = writeEntry;
+ this.seqNumAssignedLatch.countDown();
+ }
+
// should be < 0 (@see HLogKey#readFields(DataInput))
// version 2 supports WAL compression
// public members here are only public because of HLogKey
@@ -151,7 +186,9 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
- static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
+ private MultiVersionConcurrencyControl mvcc;
+ private MultiVersionConcurrencyControl.WriteEntry writeEntry;
+ public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
// visible for deprecated HLogKey
@InterfaceAudience.Private
@@ -159,16 +196,17 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
public WALKey() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
- new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+ new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null);
}
@VisibleForTesting
- public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+ public WALKey(final byte[] encodedRegionName, final TableName tablename,
+ long logSeqNum,
final long now, UUID clusterId) {
List<UUID> clusterIds = new ArrayList<UUID>();
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
- HConstants.NO_NONCE, HConstants.NO_NONCE);
+ HConstants.NO_NONCE, HConstants.NO_NONCE, null);
}
public WALKey(final byte[] encodedRegionName, final TableName tablename) {
@@ -176,8 +214,28 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
- init(encodedRegionName, tablename, NO_SEQUENCE_ID, now,
- EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ init(encodedRegionName,
+ tablename,
+ NO_SEQUENCE_ID,
+ now,
+ EMPTY_UUIDS,
+ HConstants.NO_NONCE,
+ HConstants.NO_NONCE,
+ null);
+ }
+
+ public WALKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ final long now,
+ MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName,
+ tablename,
+ NO_SEQUENCE_ID,
+ now,
+ EMPTY_UUIDS,
+ HConstants.NO_NONCE,
+ HConstants.NO_NONCE,
+ mvcc);
}
/**
@@ -187,15 +245,21 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
* <p>Used by log splitting and snapshots.
*
* @param encodedRegionName Encoded name of the region as returned by
- * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
- * @param tablename - name of table
- * @param logSeqNum - log sequence number
- * @param now Time at which this edit was written.
- * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename - name of table
+ * @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
*/
- public WALKey(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ public WALKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List<UUID> clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -204,17 +268,18 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
- * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename
- * @param now Time at which this edit was written.
- * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup
* @param nonce
+ * @param mvcc mvcc control used to generate sequence numbers and control read/write points
*/
- public WALKey(final byte [] encodedRegionName, final TableName tablename,
- final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds,
- nonceGroup, nonce);
+ public WALKey(final byte[] encodedRegionName, final TableName tablename,
+ final long now, List<UUID> clusterIds, long nonceGroup,
+ final long nonce, final MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -223,21 +288,37 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
- * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename
* @param logSeqNum
* @param nonceGroup
* @param nonce
*/
- public WALKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
- long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(),
- EMPTY_UUIDS, nonceGroup, nonce);
+ public WALKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ long nonceGroup,
+ long nonce,
+ final MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName,
+ tablename,
+ logSeqNum,
+ EnvironmentEdgeManager.currentTime(),
+ EMPTY_UUIDS,
+ nonceGroup,
+ nonce,
+ mvcc);
}
@InterfaceAudience.Private
- protected void init(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+ protected void init(final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List<UUID> clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc) {
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
@@ -245,6 +326,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
this.tablename = tablename;
this.nonceGroup = nonceGroup;
this.nonce = nonce;
+ this.mvcc = mvcc;
}
/**
@@ -270,15 +352,14 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
/**
- * Allow that the log sequence id to be set post-construction and release all waiters on assigned
- * sequence number.
+ * Allow that the log sequence id to be set post-construction
* Only public for org.apache.hadoop.hbase.regionserver.wal.FSWALEntry
* @param sequence
*/
@InterfaceAudience.Private
public void setLogSeqNum(final long sequence) {
this.logSeqNum = sequence;
- this.seqNumAssignedLatch.countDown();
+
}
/**
@@ -492,21 +573,22 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
this.encodedRegionName = encodedRegionName;
}
- public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
- throws IOException {
- org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
+ public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(
+ WALCellCodec.ByteStringCompressor compressor) throws IOException {
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder =
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
if (compressionContext == null) {
builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
} else {
builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
- compressionContext.regionDict));
+ compressionContext.regionDict));
builder.setTableName(compressor.compress(this.tablename.getName(),
- compressionContext.tableDict));
+ compressionContext.tableDict));
}
builder.setLogSequenceNumber(this.logSeqNum);
builder.setWriteTime(writeTime);
- if(this.origLogSeqNum > 0) {
+ if (this.origLogSeqNum > 0) {
builder.setOrigSequenceNumber(this.origLogSeqNum);
}
if (this.nonce != HConstants.NO_NONCE) {
@@ -532,8 +614,9 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
return builder;
}
- public void readFieldsFromPb(
- org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
+ public void readFieldsFromPb(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey,
+ WALCellCodec.ByteStringUncompressor uncompressor)
+ throws IOException {
if (this.compressionContext != null) {
this.encodedRegionName = uncompressor.uncompress(
walKey.getEncodedRegionName(), compressionContext.regionDict);
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 51043e4..3741cdf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -2301,7 +2301,7 @@ public class WALSplitter {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
- clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce());
+ clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
logEntry.setFirst(key);
logEntry.setSecond(val);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
index 7fd8902..1665e66 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 6223b15..bb216b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,12 +45,12 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
-import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -225,8 +224,7 @@ public class TestIOFencing {
*/
@Test
public void testFencingAroundCompaction() throws Exception {
- doTest(BlockCompactionsInPrepRegion.class, false);
- doTest(BlockCompactionsInPrepRegion.class, true);
+ doTest(BlockCompactionsInPrepRegion.class);
}
/**
@@ -237,13 +235,11 @@ public class TestIOFencing {
*/
@Test
public void testFencingAroundCompactionAfterWALSync() throws Exception {
- doTest(BlockCompactionsInCompletionRegion.class, false);
- doTest(BlockCompactionsInCompletionRegion.class, true);
+ doTest(BlockCompactionsInCompletionRegion.class);
}
- public void doTest(Class<?> regionClass, boolean distributedLogReplay) throws Exception {
+ public void doTest(Class<?> regionClass) throws Exception {
Configuration c = TEST_UTIL.getConfiguration();
- c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
// Insert our custom region
c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
c.setBoolean("dfs.support.append", true);
@@ -285,7 +281,7 @@ public class TestIOFencing {
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
- oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
+ oldHri, compactionDescriptor, compactingRegion.getMVCC());
// Wait till flush has happened, otherwise there won't be multiple store files
long startWaitTime = System.currentTimeMillis();
@@ -356,4 +352,4 @@ public class TestIOFencing {
TEST_UTIL.shutdownMiniCluster();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 37e98e8..a064bcc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -376,7 +376,6 @@ public class TestReplicasClient {
}
}
-
@Test
public void testFlushTable() throws Exception {
openRegion(hriSecondary);
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
index 78deed9..0a4ca16 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
@@ -141,6 +141,4 @@ public class TestRegionObserverStacking extends TestCase {
assertTrue(idB < idC);
HBaseTestingUtility.closeRegionAndWAL(region);
}
-
-}
-
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index 8993255..7772664 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -29,7 +29,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -188,7 +188,6 @@ public class TestWALObserver {
Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
deleteDir(basedir);
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
- final AtomicLong sequenceId = new AtomicLong(0);
// TEST_FAMILY[0] shall be removed from WALEdit.
// TEST_FAMILY[1] value shall be changed.
@@ -237,7 +236,7 @@ public class TestWALObserver {
long now = EnvironmentEdgeManager.currentTime();
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
- edit, sequenceId, true, null);
+ edit, true);
log.sync(txid);
// the edit shall have been change now by the coprocessor.
@@ -273,7 +272,7 @@ public class TestWALObserver {
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
.toString(TEST_TABLE));
final HRegionInfo hri = new HRegionInfo(tableName, null, null);
- final AtomicLong sequenceId = new AtomicLong(0);
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName()));
@@ -300,7 +299,7 @@ public class TestWALObserver {
final int countPerFamily = 5;
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+ EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
}
LOG.debug("Verify that only the non-legacy CP saw edits.");
@@ -324,7 +323,7 @@ public class TestWALObserver {
final WALEdit edit = new WALEdit();
final byte[] nonce = Bytes.toBytes("1772");
edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
- final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null);
+ final long txid = wal.append(htd, hri, legacyKey, edit, true);
wal.sync(txid);
LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
@@ -349,7 +348,7 @@ public class TestWALObserver {
public void testEmptyWALEditAreNotSeen() throws Exception {
final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
- final AtomicLong sequenceId = new AtomicLong(0);
+ final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
WAL log = wals.getWAL(UNSPECIFIED_REGION, null);
try {
@@ -361,8 +360,9 @@ public class TestWALObserver {
assertFalse(cp.isPostWALWriteCalled());
final long now = EnvironmentEdgeManager.currentTime();
- long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
- new WALEdit(), sequenceId, true, null);
+ long txid = log.append(htd, hri,
+ new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc),
+ new WALEdit(), true);
log.sync(txid);
assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
@@ -381,7 +381,7 @@ public class TestWALObserver {
// ultimately called by HRegion::initialize()
TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
- final AtomicLong sequenceId = new AtomicLong(0);
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
// final HRegionInfo hri =
// createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
// final HRegionInfo hri1 =
@@ -405,10 +405,9 @@ public class TestWALObserver {
// for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+ EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
}
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
- true, null);
+ wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
// sync to fs.
wal.sync();
@@ -528,7 +527,7 @@ public class TestWALObserver {
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
- final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException {
+ final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
String familyStr = Bytes.toString(family);
long txid = -1;
for (int j = 0; j < count; j++) {
@@ -539,7 +538,7 @@ public class TestWALObserver {
// uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
// about legacy coprocessors
txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
- ee.currentTime()), edit, sequenceId, true, null);
+ ee.currentTime(), mvcc), edit, true);
}
if (-1 != txid) {
wal.sync(txid);
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
index d82f36b..5fa588b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
@@ -32,8 +32,8 @@ import org.junit.experimental.categories.Category;
public class TestHLogRecordReader extends TestWALRecordReader {
@Override
- protected WALKey getWalKey(final long sequenceid) {
- return new HLogKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+ protected WALKey getWalKey(final long time) {
+ return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index 64ef8fd..a4381c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -75,6 +76,7 @@ public class TestWALRecordReader {
private static final byte [] value = Bytes.toBytes("value");
private static HTableDescriptor htd;
private static Path logDir;
+ protected MultiVersionConcurrencyControl mvcc;
private static String getName() {
return "TestWALRecordReader";
@@ -82,6 +84,7 @@ public class TestWALRecordReader {
@Before
public void setUp() throws Exception {
+ mvcc = new MultiVersionConcurrencyControl();
FileStatus[] entries = fs.listStatus(hbaseDir);
for (FileStatus dir : entries) {
fs.delete(dir.getPath(), true);
@@ -124,13 +127,11 @@ public class TestWALRecordReader {
// being millisecond based.
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
- final AtomicLong sequenceId = new AtomicLong(0);
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
- log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null);
+ log.append(htd, info, getWalKey(ts), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
- log.append(htd, info, getWalKey(ts+1), edit, sequenceId,
- true, null);
+ log.append(htd, info, getWalKey(ts+1), edit, true);
log.sync();
LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter();
@@ -141,12 +142,10 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
- log.append(htd, info, getWalKey(ts1+1), edit, sequenceId,
- true, null);
+ log.append(htd, info, getWalKey(ts1+1), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
- log.append(htd, info, getWalKey(ts1+2), edit, sequenceId,
- true, null);
+ log.append(htd, info, getWalKey(ts1+2), edit, true);
log.sync();
log.shutdown();
walfactory.shutdown();
@@ -188,8 +187,7 @@ public class TestWALRecordReader {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
- long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
- null);
+ long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
log.sync(txid);
Thread.sleep(1); // make sure 2nd log gets a later timestamp
@@ -199,8 +197,7 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
- txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
- null);
+ txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
log.sync(txid);
log.shutdown();
walfactory.shutdown();
@@ -239,8 +236,8 @@ public class TestWALRecordReader {
testSplit(splits.get(1));
}
- protected WALKey getWalKey(final long sequenceid) {
- return new WALKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+ protected WALKey getWalKey(final long time) {
+ return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
}
protected WALRecordReader getReader() {