You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/09/23 02:23:22 UTC
[4/5] hbase git commit: HBASE-12751 Allow RowLock to be reader writer
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 2c145b4..a8ffa8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -46,7 +46,6 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
@@ -61,6 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
@@ -147,7 +146,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -203,13 +201,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private static final Log LOG = LogFactory.getLog(HRegion.class);
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
- "hbase.hregion.scan.loadColumnFamiliesOnDemand";
+ "hbase.hregion.scan.loadColumnFamiliesOnDemand";
/**
* Longest time we'll wait on a sequenceid.
* Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use
- * it without cleanup previous usage properly; generally, a WAL roll is needed.
- * Key to use changing the default of 30000ms.
+ * it without cleaning up previous usage properly; generally, a WAL roll is needed. The timeout
+ * is for a latch in WALKey. There is no global accounting of outstanding WALKeys; intentionally
+ * to avoid contention, but it makes it so if an abort or problem, we could be stuck waiting
+ * on the WALKey latch. Revisit.
*/
private final int maxWaitForSeqId;
private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
@@ -222,6 +222,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
final AtomicBoolean closed = new AtomicBoolean(false);
+
/* Closing can take some time; use the closing flag if there is stuff we don't
* want to do while in closing state; e.g. like offer this region up to the
* master as a region to close if the carrying regionserver is overloaded.
@@ -241,19 +242,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* {@link #maxFlushedSeqId} will be older than the oldest edit in memory.
*/
private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
- /**
- * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
- * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
- * Its default value is -1L. This default is used as a marker to indicate
- * that the region hasn't opened yet. Once it is opened, it is set to the derived
- * #openSeqNum, the largest sequence id of all hfiles opened under this Region.
- *
- * <p>Control of this sequence is handed off to the WAL implementation. It is responsible
- * for tagging edits with the correct sequence id since it is responsible for getting the
- * edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT
- * OUTSIDE OF THE WAL. The value you get will not be what you think it is.
- */
- private final AtomicLong sequenceId = new AtomicLong(-1L);
/**
* The sequence id of the last replayed open region event from the primary region. This is used
@@ -371,7 +359,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* @return The smallest mvcc readPoint across all the scanners in this
- * region. Writes older than this readPoint, are included in every
+ * region. Writes older than this readPoint, are included in every
* read operation.
*/
public long getSmallestReadPoint() {
@@ -380,7 +368,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// no new RegionScanners can grab a readPoint that we are unaware of.
// We achieve this by synchronizing on the scannerReadPoints object.
synchronized(scannerReadPoints) {
- minimumReadPoint = mvcc.memstoreReadPoint();
+ minimumReadPoint = mvcc.getReadPoint();
for (Long readPoint: this.scannerReadPoints.values()) {
if (readPoint < minimumReadPoint) {
@@ -592,8 +580,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private boolean splitRequest;
private byte[] explicitSplitPoint = null;
- private final MultiVersionConcurrencyControl mvcc =
- new MultiVersionConcurrencyControl();
+ private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
// Coprocessor host
private RegionCoprocessorHost coprocessorHost;
@@ -629,6 +616,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @deprecated Use other constructors.
*/
@Deprecated
+ @VisibleForTesting
public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
final Configuration confParam, final HRegionInfo regionInfo,
final HTableDescriptor htd, final RegionServerServices rsServices) {
@@ -819,7 +807,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Initialize all the HStores
status.setStatus("Initializing all the Stores");
- long maxSeqId = initializeRegionStores(reporter, status, false);
+ long maxSeqId = initializeStores(reporter, status);
+ this.mvcc.advanceTo(maxSeqId);
+ if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
+ // Recover any edits if available.
+ maxSeqId = Math.max(maxSeqId,
+ replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+ // Make sure mvcc is up to max.
+ this.mvcc.advanceTo(maxSeqId);
+ }
this.lastReplayedOpenRegionSeqId = maxSeqId;
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
@@ -882,10 +878,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return nextSeqid;
}
- private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status,
- boolean warmupOnly)
- throws IOException {
-
+ /**
+ * Open all Stores.
+ * @param reporter
+ * @param status
+ * @return Highest sequenceId found out in a Store.
+ * @throws IOException
+ */
+ private long initializeStores(final CancelableProgressable reporter, MonitoredTask status)
+ throws IOException {
// Load in all the HStores.
long maxSeqId = -1;
@@ -947,14 +948,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
}
- if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this) && !warmupOnly) {
- // Recover any edits if available.
- maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
- this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
- }
- maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
- mvcc.initialize(maxSeqId);
- return maxSeqId;
+ return Math.max(maxSeqId, maxMemstoreTS + 1);
}
private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
@@ -962,7 +956,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Initialize all the HStores
status.setStatus("Warming up all the Stores");
- initializeRegionStores(reporter, status, true);
+ initializeStores(reporter, status);
}
private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
@@ -978,8 +972,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
- WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
- getSequenceId());
+ WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc);
}
private void writeRegionCloseMarker(WAL wal) throws IOException {
@@ -993,17 +986,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
- RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
+ RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
getRegionServerServices().getServerName(), storeFiles);
- WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
- getSequenceId());
+ WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc);
// Store SeqId in HDFS when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
// table is still online
if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
- getSequenceId().get(), 0);
+ mvcc.getReadPoint(), 0);
}
}
@@ -1275,7 +1267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// This scan can read even uncommitted transactions
return Long.MAX_VALUE;
}
- return mvcc.memstoreReadPoint();
+ return mvcc.getReadPoint();
}
@Override
@@ -1955,11 +1947,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean shouldFlushStore(Store store) {
long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
store.getFamily().getName()) - 1;
- if (earliest > 0 && earliest + flushPerChanges < sequenceId.get()) {
+ if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " +
getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest +
- " is > " + this.flushPerChanges + " from current=" + sequenceId.get());
+ " is > " + this.flushPerChanges + " from current=" + mvcc.getReadPoint());
}
return true;
}
@@ -1985,7 +1977,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
whyFlush.setLength(0);
// This is a rough measure.
if (this.maxFlushedSeqId > 0
- && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
+ && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) {
whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush");
return true;
}
@@ -2075,11 +2067,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- protected PrepareFlushResult internalPrepareFlushCache(
- final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
- MonitoredTask status, boolean writeFlushWalMarker)
- throws IOException {
-
+ protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
+ final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
+ throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@@ -2089,7 +2079,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (this.memstoreSize.get() <= 0) {
// Take an update lock because am about to change the sequence id and we want the sequence id
// to be at the border of the empty memstore.
- MultiVersionConcurrencyControl.WriteEntry w = null;
+ MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
this.updatesLock.writeLock().lock();
try {
if (this.memstoreSize.get() <= 0) {
@@ -2097,29 +2087,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// this region out in the WAL subsystem so no need to do any trickery clearing out
// edits in the WAL system. Up the sequence number so the resulting flush id is for
// sure just beyond the last appended region edit (useful as a marker when bulk loading,
- // etc.)
- // wal can be null replaying edits.
+ // etc.). NOTE: The writeEntry write number is NOT in the WAL.. there is no WAL writing
+ // here.
if (wal != null) {
- w = mvcc.beginMemstoreInsert();
- long flushOpSeqId = getNextSequenceId(wal);
+ writeEntry = mvcc.begin();
+ long flushOpSeqId = writeEntry.getWriteNumber();
FlushResult flushResult = new FlushResultImpl(
- FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush",
- writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
- w.setWriteNumber(flushOpSeqId);
- mvcc.waitForPreviousTransactionsComplete(w);
- w = null;
+ FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+ flushOpSeqId,
+ "Nothing to flush",
+ writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
+ // TODO: Lets see if we hang here, if there is a scenario where an outstanding reader
+ // with a read point is in advance of this write point.
+ mvcc.completeAndWait(writeEntry);
+ writeEntry = null;
return new PrepareFlushResult(flushResult, myseqid);
} else {
return new PrepareFlushResult(
- new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
- "Nothing to flush", false),
+ new FlushResultImpl(
+ FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+ "Nothing to flush",
+ false),
myseqid);
}
}
} finally {
this.updatesLock.writeLock().unlock();
- if (w != null) {
- mvcc.advanceMemstore(w);
+ if (writeEntry != null) {
+ mvcc.complete(writeEntry);
}
}
}
@@ -2130,10 +2125,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!isAllFamilies(storesToFlush)) {
perCfExtras = new StringBuilder();
for (Store store: storesToFlush) {
- perCfExtras.append("; ");
- perCfExtras.append(store.getColumnFamilyName());
- perCfExtras.append("=");
- perCfExtras.append(StringUtils.byteDesc(store.getMemStoreSize()));
+ perCfExtras.append("; ").append(store.getColumnFamilyName());
+ perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
}
}
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
@@ -2178,7 +2171,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long trxId = 0;
try {
try {
- writeEntry = mvcc.beginMemstoreInsert();
+ writeEntry = mvcc.begin();
if (wal != null) {
Long earliestUnflushedSequenceIdForTheRegion =
wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
@@ -2213,7 +2206,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
getRegionInfo(), flushOpSeqId, committedFiles);
// no sync. Sync is below where we do not hold the updates lock
trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, false);
+ desc, false, mvcc);
}
// Prepare flush (take a snapshot)
@@ -2227,7 +2220,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, false);
+ desc, false, mvcc);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
StringUtils.stringifyException(t));
@@ -2261,18 +2254,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that
// were removed via a rollbackMemstore could be written to Hfiles.
- writeEntry.setWriteNumber(flushOpSeqId);
- mvcc.waitForPreviousTransactionsComplete(writeEntry);
- // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
+ mvcc.completeAndWait(writeEntry);
+ // set writeEntry to null to prevent mvcc.complete from being called again inside finally
+ // block
writeEntry = null;
} finally {
if (writeEntry != null) {
- // in case of failure just mark current writeEntry as complete
- mvcc.advanceMemstore(writeEntry);
+ // In case of failure just mark current writeEntry as complete.
+ mvcc.complete(writeEntry);
}
}
- return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId,
- flushedSeqId, totalFlushableSizeOfFlushableStores);
+ return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
+ flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
}
/**
@@ -2292,10 +2285,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
- getRegionInfo(), -1, new TreeMap<byte[], List<Path>>());
+ getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
try {
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, true);
+ desc, true, mvcc);
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -2364,7 +2357,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, true);
+ desc, true, mvcc);
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
@@ -2378,7 +2371,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, false);
+ desc, false, mvcc);
} catch (Throwable ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:"
@@ -2456,7 +2449,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// a timeout. May happen in tests after we tightened the semantic via HBASE-14317.
// Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches
// so if an abort or stop, there is no way to call them in.
- WALKey key = this.appendEmptyEdit(wal, null);
+ WALKey key = this.appendEmptyEdit(wal);
+ mvcc.complete(key.getWriteEntry());
return key.getSequenceId(this.maxWaitForSeqId);
}
@@ -2885,7 +2879,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired
Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
- List<Cell> memstoreCells = new ArrayList<Cell>();
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
@@ -2950,17 +2943,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If we haven't got any rows in our batch, we should block to
// get the next one.
- boolean shouldBlock = numReadyToWrite == 0;
RowLock rowLock = null;
try {
- rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
+ rowLock = getRowLock(mutation.getRow(), true);
} catch (IOException ioe) {
LOG.warn("Failed getting lock in batch put, row="
+ Bytes.toStringBinary(mutation.getRow()), ioe);
}
if (rowLock == null) {
// We failed to grab another lock
- assert !shouldBlock : "Should never fail to get lock when blocking";
+ assert false: "Should never fail to get lock when blocking";
break; // stop acquiring more rows for this batch
} else {
acquiredRowLocks.add(rowLock);
@@ -3020,16 +3012,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
lock(this.updatesLock.readLock(), numReadyToWrite);
locked = true;
- if(isInReplay) {
- mvccNum = batchOp.getReplaySequenceId();
- } else {
- mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
- }
- //
- // ------------------------------------
- // Acquire the latest mvcc number
- // ----------------------------------
- writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// calling the pre CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) {
@@ -3040,35 +3022,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// ------------------------------------
- // STEP 3. Write back to memstore
- // Write to memstore. It is ok to write to memstore
- // first without updating the WAL because we do not roll
- // forward the memstore MVCC. The MVCC will be moved up when
- // the complete operation is done. These changes are not yet
- // visible to scanners till we update the MVCC. The MVCC is
- // moved only when the sync is complete.
- // ----------------------------------
- long addedSize = 0;
- for (int i = firstIndex; i < lastIndexExclusive; i++) {
- if (batchOp.retCodeDetails[i].getOperationStatusCode()
- != OperationStatusCode.NOT_RUN) {
- continue;
- }
- doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
- addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
- }
-
- // ------------------------------------
- // STEP 4. Build WAL edit
+ // STEP 3. Build WAL edit
// ----------------------------------
Durability durability = Durability.USE_DEFAULT;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing
- if (batchOp.retCodeDetails[i].getOperationStatusCode()
- != OperationStatusCode.NOT_RUN) {
+ if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
continue;
}
- batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
Mutation m = batchOp.getMutation(i);
Durability tmpDur = getEffectiveDurability(m.getDurability());
@@ -3094,9 +3055,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(),
- currentNonceGroup, currentNonce);
+ currentNonceGroup, currentNonce, mvcc);
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
- walEdit, getSequenceId(), true, null);
+ walEdit, true);
walEdit = new WALEdit(isInReplay);
walKey = null;
}
@@ -3115,38 +3076,57 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// -------------------------
- // STEP 5. Append the final edit to WAL. Do not sync wal.
+ // STEP 4. Append the final edit to WAL. Do not sync wal.
// -------------------------
Mutation mutation = batchOp.getMutation(firstIndex);
if (isInReplay) {
// use wal key from the original
walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
- mutation.getClusterIds(), currentNonceGroup, currentNonce);
+ mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
long replaySeqId = batchOp.getReplaySequenceId();
walKey.setOrigLogSeqNum(replaySeqId);
-
- // ensure that the sequence id of the region is at least as big as orig log seq id
- while (true) {
- long seqId = getSequenceId().get();
- if (seqId >= replaySeqId) break;
- if (getSequenceId().compareAndSet(seqId, replaySeqId)) break;
- }
}
if (walEdit.size() > 0) {
if (!isInReplay) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
- mutation.getClusterIds(), currentNonceGroup, currentNonce);
+ mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
}
-
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
- getSequenceId(), true, memstoreCells);
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ }
+ // ------------------------------------
+ // Acquire the latest mvcc number
+ // ----------------------------------
+ if (walKey == null) {
+ // If this is a skip wal operation just get the read point from mvcc
+ walKey = this.appendEmptyEdit(this.wal);
+ }
+ if (!isInReplay) {
+ writeEntry = walKey.getWriteEntry();
+ mvccNum = writeEntry.getWriteNumber();
+ } else {
+ mvccNum = batchOp.getReplaySequenceId();
}
- if (walKey == null){
- // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendEmptyEdit(this.wal, memstoreCells);
+
+ // ------------------------------------
+ // STEP 5. Write back to memstore
+ // Write to memstore. It is ok to write to memstore
+ // first without syncing the WAL because we do not roll
+ // forward the memstore MVCC. The MVCC will be moved up when
+ // the complete operation is done. These changes are not yet
+ // visible to scanners till we update the MVCC. The MVCC is
+ // moved only when the sync is complete.
+ // ----------------------------------
+ long addedSize = 0;
+ for (int i = firstIndex; i < lastIndexExclusive; i++) {
+ if (batchOp.retCodeDetails[i].getOperationStatusCode()
+ != OperationStatusCode.NOT_RUN) {
+ continue;
+ }
+ doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
+ addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay);
}
// -------------------------------
@@ -3174,13 +3154,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postBatchMutate(miniBatchOp);
}
-
// ------------------------------------------------------------------
// STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
// ------------------------------------------------------------------
if (writeEntry != null) {
- mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+ mvcc.completeAndWait(writeEntry);
writeEntry = null;
+ } else if (isInReplay) {
+ // ensure that the sequence id of the region is at least as big as orig log seq id
+ mvcc.advanceTo(mvccNum);
+ }
+
+ for (int i = firstIndex; i < lastIndexExclusive; i ++) {
+ if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
+ batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
+ }
}
// ------------------------------------
@@ -3208,10 +3196,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally {
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
- rollbackMemstore(memstoreCells);
- if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
+ for (int j = 0; j < familyMaps.length; j++) {
+ for(List<Cell> cells:familyMaps[j].values()) {
+ rollbackMemstore(cells);
+ }
+ }
+ if (writeEntry != null) mvcc.complete(writeEntry);
} else if (writeEntry != null) {
- mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+ mvcc.completeAndWait(writeEntry);
}
if (locked) {
@@ -3298,7 +3290,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow());
// wait for all previous transactions to complete (with lock held)
- mvcc.waitForPreviousTransactionsComplete();
+ mvcc.await();
try {
if (this.getCoprocessorHost() != null) {
Boolean processed = null;
@@ -3407,7 +3399,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow());
// wait for all previous transactions to complete (with lock held)
- mvcc.waitForPreviousTransactionsComplete();
+ mvcc.await();
try {
List<Cell> result = get(get, false);
@@ -3484,7 +3476,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private void doBatchMutate(Mutation mutation) throws IOException {
// Currently this is only called for puts and deletes, so no nonces.
- OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
+ OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
@@ -3669,7 +3661,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* new entries.
*/
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
- long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
+ long mvccNum, boolean isInReplay) throws IOException {
long size = 0;
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
@@ -3680,10 +3672,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int listSize = cells.size();
for (int i=0; i < listSize; i++) {
Cell cell = cells.get(i);
- CellUtil.setSequenceId(cell, mvccNum);
+ if (cell.getSequenceId() == 0) {
+ CellUtil.setSequenceId(cell, mvccNum);
+ }
Pair<Long, Cell> ret = store.add(cell);
size += ret.getFirst();
- memstoreCells.add(ret.getSecond());
if(isInReplay) {
// set memstore newly added cells with replay mvcc number
CellUtil.setSequenceId(ret.getSecond(), mvccNum);
@@ -4440,12 +4433,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.maxFlushedSeqId = flush.getFlushSequenceNumber();
// advance the mvcc read point so that the new flushed file is visible.
- // there may be some in-flight transactions, but they won't be made visible since they are
- // either greater than flush seq number or they were already dropped via flush.
- // TODO: If we are using FlushAllStoresPolicy, then this can make edits visible from other
- // stores while they are still in flight because the flush commit marker will not contain
- // flushes from ALL stores.
- getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
+ mvcc.advanceTo(flush.getFlushSequenceNumber());
} catch (FileNotFoundException ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -4512,15 +4500,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Drops the memstore contents after replaying a flush descriptor or region open event replay
* if the memstore edits have seqNums smaller than the given seq id
- * @param flush the flush descriptor
* @throws IOException
*/
private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
long totalFreedSize = 0;
this.updatesLock.writeLock().lock();
try {
- mvcc.waitForPreviousTransactionsComplete();
- long currentSeqId = getSequenceId().get();
+
+ long currentSeqId = mvcc.getReadPoint();
if (seqId >= currentSeqId) {
// then we can drop the memstore contents since everything is below this seqId
LOG.info(getRegionInfo().getEncodedName() + " : "
@@ -4683,9 +4670,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
dropPrepareFlushIfPossible();
// advance the mvcc read point so that the new flushed file is visible.
- // there may be some in-flight transactions, but they won't be made visible since they are
- // either greater than flush seq number or they were already dropped via flush.
- getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
+ mvcc.await();
// If we were waiting for observing a flush or region opening event for not showing partial
// data after a secondary region crash, we can allow reads now.
@@ -4776,7 +4761,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
- getMVCC().advanceMemstoreReadPointIfNeeded(bulkLoadEvent.getBulkloadSeqNum());
+ mvcc.advanceTo(bulkLoadEvent.getBulkloadSeqNum());
}
} finally {
closeBulkRegionOperation();
@@ -4875,11 +4860,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
dropPrepareFlushIfPossible();
// advance the mvcc read point so that the new flushed files are visible.
- // there may be some in-flight transactions, but they won't be made visible since they are
- // either greater than flush seq number or they were already picked up via flush.
- for (Store s : getStores()) {
- getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS());
- }
+ // either greater than flush seq number or they were already picked up via flush.
+ for (Store s : getStores()) {
+ mvcc.advanceTo(s.getMaxMemstoreTS());
+ }
+
// smallestSeqIdInStores is the seqId that we have a corresponding hfile for. We can safely
// skip all edits that are to be replayed in the future with that has a smaller seqId
@@ -5037,75 +5022,91 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- @Override
- public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
- startRegionOperation();
- try {
- return getRowLockInternal(row, waitForLock);
- } finally {
- closeRegionOperation();
- }
+
+ /**
+ * Get an exclusive ( write lock ) lock on a given row.
+ * @param row Which row to lock.
+ * @return A locked RowLock. The lock is exclusive and already aqquired.
+ * @throws IOException
+ */
+ public RowLock getRowLock(byte[] row) throws IOException {
+ return getRowLock(row, false);
}
/**
- * A version of getRowLock(byte[], boolean) to use when a region operation has already been
+ *
+ * Get a row lock for the specified row. All locks are reentrant.
+ *
+ * Before calling this function make sure that a region operation has already been
* started (the calling thread has already acquired the region-close-guard lock).
+ * @param row The row actions will be performed against
+ * @param readLock is the lock reader or writer. True indicates that a non-exlcusive
+ * lock is requested
*/
- protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
+ public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
+ // Make sure the row is inside of this region before getting the lock for it.
+ checkRow(row, "row lock");
+ // create an object to use a a key in the row lock map
HashedBytes rowKey = new HashedBytes(row);
- RowLockContext rowLockContext = new RowLockContext(rowKey);
- // loop until we acquire the row lock (unless !waitForLock)
- while (true) {
- RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
- if (existingContext == null) {
- // Row is not already locked by any thread, use newly created context.
- break;
- } else if (existingContext.ownedByCurrentThread()) {
- // Row is already locked by current thread, reuse existing context instead.
- rowLockContext = existingContext;
- break;
- } else {
- if (!waitForLock) {
- return null;
+ RowLockContext rowLockContext = null;
+ RowLockImpl result = null;
+ TraceScope traceScope = null;
+
+ // If we're tracing start a span to show how long this took.
+ if (Trace.isTracing()) {
+ traceScope = Trace.startSpan("HRegion.getRowLock");
+ traceScope.getSpan().addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock"));
+ }
+
+ try {
+ // Keep trying until we have a lock or error out.
+ // TODO: do we need to add a time component here?
+ while (result == null) {
+
+ // Try adding a RowLockContext to the lockedRows.
+ // If we can add it then there's no other transactions currently running.
+ rowLockContext = new RowLockContext(rowKey);
+ RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+
+ // if there was a running transaction then there's already a context.
+ if (existingContext != null) {
+ rowLockContext = existingContext;
}
- TraceScope traceScope = null;
- try {
- if (Trace.isTracing()) {
- traceScope = Trace.startSpan("HRegion.getRowLockInternal");
- }
- // Row is already locked by some other thread, give up or wait for it
- if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
- if(traceScope != null) {
- traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
- }
- throw new IOException("Timed out waiting for lock for row: " + rowKey);
- }
- if (traceScope != null) traceScope.close();
- traceScope = null;
- } catch (InterruptedException ie) {
- LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
- InterruptedIOException iie = new InterruptedIOException();
- iie.initCause(ie);
- throw iie;
- } finally {
- if (traceScope != null) traceScope.close();
+
+ // Now try an get the lock.
+ //
+ // This can fail as
+ if (readLock) {
+ result = rowLockContext.newReadLock();
+ } else {
+ result = rowLockContext.newWriteLock();
}
}
+ if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
+ if (traceScope != null) {
+ traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
+ }
+ result = null;
+ // Clean up the counts just in case this was the thing keeping the context alive.
+ rowLockContext.cleanUp();
+ throw new IOException("Timed out waiting for lock for row: " + rowKey);
+ }
+ return result;
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ if (traceScope != null) {
+ traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");
+ }
+ Thread.currentThread().interrupt();
+ throw iie;
+ } finally {
+ if (traceScope != null) {
+ traceScope.close();
+ }
}
-
- // allocate new lock for this thread
- return rowLockContext.newLock();
- }
-
- /**
- * Acquires a lock on the given row.
- * The same thread may acquire multiple locks on the same row.
- * @return the acquired row lock
- * @throws IOException if the lock could not be acquired after waiting
- */
- public RowLock getRowLock(byte[] row) throws IOException {
- return getRowLock(row, true);
}
@Override
@@ -5118,6 +5119,97 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ @VisibleForTesting
+ class RowLockContext {
+ private final HashedBytes row;
+ final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
+ final AtomicBoolean usable = new AtomicBoolean(true);
+ final AtomicInteger count = new AtomicInteger(0);
+ final Object lock = new Object();
+
+ RowLockContext(HashedBytes row) {
+ this.row = row;
+ }
+
+ RowLockImpl newWriteLock() {
+ Lock l = readWriteLock.writeLock();
+ return getRowLock(l);
+ }
+ RowLockImpl newReadLock() {
+ Lock l = readWriteLock.readLock();
+ return getRowLock(l);
+ }
+
+ private RowLockImpl getRowLock(Lock l) {
+ count.incrementAndGet();
+ synchronized (lock) {
+ if (usable.get()) {
+ return new RowLockImpl(this, l);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ void cleanUp() {
+ long c = count.decrementAndGet();
+ if (c <= 0) {
+ synchronized (lock) {
+ if (count.get() <= 0 ){
+ usable.set(false);
+ RowLockContext removed = lockedRows.remove(row);
+ assert removed == this: "we should never remove a different context";
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "RowLockContext{" +
+ "row=" + row +
+ ", readWriteLock=" + readWriteLock +
+ ", count=" + count +
+ '}';
+ }
+ }
+
+ /**
+ * Class used to represent a lock on a row.
+ */
+ public static class RowLockImpl implements RowLock {
+ private final RowLockContext context;
+ private final Lock lock;
+
+ public RowLockImpl(RowLockContext context, Lock lock) {
+ this.context = context;
+ this.lock = lock;
+ }
+
+ public Lock getLock() {
+ return lock;
+ }
+
+ @VisibleForTesting
+ public RowLockContext getContext() {
+ return context;
+ }
+
+ @Override
+ public void release() {
+ lock.unlock();
+ context.cleanUp();
+ }
+
+ @Override
+ public String toString() {
+ return "RowLockImpl{" +
+ "context=" + context +
+ ", lock=" + lock +
+ '}';
+ }
+ }
+
/**
* Determines whether multiple column families are present
* Precondition: familyPaths is not null
@@ -5263,7 +5355,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.getRegionInfo().getTable(),
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
- loadDescriptor, sequenceId);
+ loadDescriptor);
} catch (IOException ioe) {
if (this.rsServices != null) {
// Have to abort region server because some hfiles has been loaded but we can't write
@@ -6010,13 +6102,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FileSystem fs = FileSystem.get(conf);
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
- HRegion region = HRegion.newHRegion(tableDir,
- wal, fs, conf, info, hTableDescriptor, null);
- if (initialize) {
- // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when
- // verifying the WALEdits.
- region.setSequenceId(region.initialize(null));
- }
+ HRegion region = HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, null);
+ if (initialize) region.initialize(null);
return region;
}
@@ -6229,7 +6316,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Refuse to open the region if a required class cannot be loaded
checkClassLoading();
this.openSeqNum = initialize(reporter);
- this.setSequenceId(openSeqNum);
+ this.mvcc.advanceTo(openSeqNum);
if (wal != null && getRegionServerServices() != null && !writestate.readOnly
&& !recovering) {
// Only write the region open event marker to WAL if (1) we are not read-only
@@ -6664,7 +6751,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
List<RowLock> acquiredRowLocks;
long addedSize = 0;
List<Mutation> mutations = new ArrayList<Mutation>();
- List<Cell> memstoreCells = new ArrayList<Cell>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
long mvccNum = 0;
WALKey walKey = null;
@@ -6673,13 +6759,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out
+ // use a writer lock for mixed reads and writes
acquiredRowLocks.add(getRowLock(row));
}
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
locked = true;
- // Get a mvcc write number
- mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
long now = EnvironmentEdgeManager.currentTime();
try {
@@ -6689,11 +6774,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
processor, now, this, mutations, walEdit, timeout);
if (!mutations.isEmpty()) {
- // 5. Start mvcc transaction
- writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
- // 6. Call the preBatchMutate hook
+
+ // 5. Call the preBatchMutate hook
processor.preBatchMutate(this, walEdit);
- // 7. Apply to memstore
+
+ long txid = 0;
+ // 6. Append no sync
+ if (!walEdit.isEmpty()) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+ processor.getClusterIds(), nonceGroup, nonce, mvcc);
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
+ walKey, walEdit, false);
+ }
+ if(walKey == null){
+ // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
+ // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
+ walKey = this.appendEmptyEdit(this.wal);
+ }
+
+ // 7. Start mvcc transaction
+ writeEntry = walKey.getWriteEntry();
+ mvccNum = walKey.getSequenceId();
+
+
+
+ // 8. Apply to memstore
for (Mutation m : mutations) {
// Handle any tag based cell features
rewriteCellTags(m.getFamilyCellMap(), m);
@@ -6708,25 +6815,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
Pair<Long, Cell> ret = store.add(cell);
addedSize += ret.getFirst();
- memstoreCells.add(ret.getSecond());
}
}
- long txid = 0;
- // 8. Append no sync
- if (!walEdit.isEmpty()) {
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
- processor.getClusterIds(), nonceGroup, nonce);
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
- walKey, walEdit, getSequenceId(), true, memstoreCells);
- }
- if(walKey == null){
- // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
- // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
- walKey = this.appendEmptyEdit(this.wal, memstoreCells);
- }
// 9. Release region lock
if (locked) {
this.updatesLock.readLock().unlock();
@@ -6759,13 +6850,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
if (writeEntry != null) {
- mvcc.cancelMemstoreInsert(writeEntry);
+ mvcc.complete(writeEntry);
writeEntry = null;
}
}
// 13. Roll mvcc forward
if (writeEntry != null) {
- mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+ mvcc.completeAndWait(writeEntry);
}
if (locked) {
this.updatesLock.readLock().unlock();
@@ -6836,6 +6927,46 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ /**
+ * @param cell
+ * @param tags
+ * @return The passed-in List<Tag> but with the tags from <code>cell</code> added.
+ */
+ private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {
+ if (cell.getTagsLength() <= 0) return tags;
+ List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags;
+ Iterator<Tag> i =
+ CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
+ while (i.hasNext()) newTags.add(i.next());
+ return newTags;
+ }
+
+ /**
+ * Run a Get against passed in <code>store</code> on passed <code>row</code>, etc.
+ * @param store
+ * @param row
+ * @param family
+ * @param tr
+ * @return Get result.
+ * @throws IOException
+ */
+ private List<Cell> doGet(final Store store, final byte [] row,
+ final Map.Entry<byte[], List<Cell>> family, final TimeRange tr)
+ throws IOException {
+ // Sort the cells so that they match the order that they
+ // appear in the Get results. Otherwise, we won't be able to
+ // find the existing values if the cells are not specified
+ // in order by the client since cells are in an array list.
+ Collections.sort(family.getValue(), store.getComparator());
+ // Get previous values for all columns in this family
+ Get get = new Get(row);
+ for (Cell cell : family.getValue()) {
+ get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
+ }
+ if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax());
+ return get(get, false);
+ }
+
public Result append(Append append) throws IOException {
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
@@ -6845,64 +6976,50 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// transactions, so all stores only go through one code path for puts.
@Override
- public Result append(Append append, long nonceGroup, long nonce) throws IOException {
- byte[] row = append.getRow();
- checkRow(row, "append");
+ public Result append(Append mutate, long nonceGroup, long nonce) throws IOException {
+ Operation op = Operation.APPEND;
+ byte[] row = mutate.getRow();
+ checkRow(row, op.toString());
boolean flush = false;
- Durability durability = getEffectiveDurability(append.getDurability());
+ Durability durability = getEffectiveDurability(mutate.getDurability());
boolean writeToWAL = durability != Durability.SKIP_WAL;
WALEdit walEdits = null;
- List<Cell> allKVs = new ArrayList<Cell>(append.size());
+ List<Cell> allKVs = new ArrayList<Cell>(mutate.size());
Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
long size = 0;
long txid = 0;
-
checkReadOnly();
checkResources();
// Lock row
- startRegionOperation(Operation.APPEND);
+ startRegionOperation(op);
this.writeRequestsCount.increment();
- long mvccNum = 0;
- WriteEntry writeEntry = null;
- WALKey walKey = null;
RowLock rowLock = null;
- List<Cell> memstoreCells = new ArrayList<Cell>();
+ WALKey walKey = null;
+ MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
boolean doRollBackMemstore = false;
try {
rowLock = getRowLock(row);
+ assert rowLock != null;
try {
lock(this.updatesLock.readLock());
try {
- // wait for all prior MVCC transactions to finish - while we hold the row lock
- // (so that we are guaranteed to see the latest state)
- mvcc.waitForPreviousTransactionsComplete();
+ // Wait for all prior MVCC transactions to finish - while we hold the row lock
+ // (so that we are guaranteed to see the latest state when we do our Get)
+ mvcc.await();
if (this.coprocessorHost != null) {
- Result r = this.coprocessorHost.preAppendAfterRowLock(append);
- if(r!= null) {
+ Result r = this.coprocessorHost.preAppendAfterRowLock(mutate);
+ if (r!= null) {
return r;
}
}
- // now start my own transaction
- mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
- writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
long now = EnvironmentEdgeManager.currentTime();
// Process each family
- for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
-
+ for (Map.Entry<byte[], List<Cell>> family : mutate.getFamilyCellMap().entrySet()) {
Store store = stores.get(family.getKey());
List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
- // Sort the cells so that they match the order that they
- // appear in the Get results. Otherwise, we won't be able to
- // find the existing values if the cells are not specified
- // in order by the client since cells are in an array list.
- Collections.sort(family.getValue(), store.getComparator());
- // Get previous values for all columns in this family
- Get get = new Get(row);
- for (Cell cell : family.getValue()) {
- get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
- }
- List<Cell> results = get(get, false);
+ List<Cell> results = doGet(store, row, family, null);
+
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the append value
@@ -6919,30 +7036,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long ts = Math.max(now, oldCell.getTimestamp());
// Process cell tags
- List<Tag> newTags = new ArrayList<Tag>();
-
// Make a union of the set of tags in the old and new KVs
-
- if (oldCell.getTagsLength() > 0) {
- Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
- oldCell.getTagsOffset(), oldCell.getTagsLength());
- while (i.hasNext()) {
- newTags.add(i.next());
- }
- }
- if (cell.getTagsLength() > 0) {
- Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
- cell.getTagsOffset(), cell.getTagsLength());
- while (i.hasNext()) {
- newTags.add(i.next());
- }
- }
+ List<Tag> newTags = carryForwardTags(oldCell, new ArrayList<Tag>());
+ newTags = carryForwardTags(cell, newTags);
// Cell TTL handling
- if (append.getTTL() != Long.MAX_VALUE) {
+ if (mutate.getTTL() != Long.MAX_VALUE) {
// Add the new TTL tag
- newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));
}
// Rebuild tags
@@ -6978,9 +7080,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Cell TTL handling
- if (append.getTTL() != Long.MAX_VALUE) {
+ if (mutate.getTTL() != Long.MAX_VALUE) {
List<Tag> newTags = new ArrayList<Tag>(1);
- newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));
// Add the new TTL tag
newCell = new TagRewriteCell(cell, Tag.fromList(newTags));
} else {
@@ -6988,11 +7090,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- CellUtil.setSequenceId(newCell, mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
- append, oldCell, newCell);
+ mutate, oldCell, newCell);
}
kvs.add(newCell);
@@ -7009,47 +7110,64 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
tempMemstore.put(store, kvs);
}
- //Actually write to Memstore now
- for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
- Store store = entry.getKey();
- if (store.getFamily().getMaxVersions() == 1) {
- // upsert if VERSIONS for this CF == 1
- size += store.upsert(entry.getValue(), getSmallestReadPoint());
- memstoreCells.addAll(entry.getValue());
+ // Actually write to WAL now
+ if (walEdits != null && !walEdits.isEmpty()) {
+ if (writeToWAL) {
+ // Using default cluster id, as this can only happen in the originating
+ // cluster. A slave cluster receives the final value (not the delta)
+ // as a Put.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ walKey = new HLogKey(
+ getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(),
+ WALKey.NO_SEQUENCE_ID,
+ nonceGroup,
+ nonce,
+ mvcc);
+ txid =
+ this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);
} else {
- // otherwise keep older versions around
- for (Cell cell: entry.getValue()) {
- Pair<Long, Cell> ret = store.add(cell);
- size += ret.getFirst();
- memstoreCells.add(ret.getSecond());
- doRollBackMemstore = true;
- }
+ recordMutationWithoutWal(mutate.getFamilyCellMap());
}
- allKVs.addAll(entry.getValue());
- }
-
- // Actually write to WAL now
- if (writeToWAL) {
- // Using default cluster id, as this can only happen in the originating
- // cluster. A slave cluster receives the final value (not the delta)
- // as a Put.
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
- txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
- this.sequenceId, true, memstoreCells);
- } else {
- recordMutationWithoutWal(append.getFamilyCellMap());
}
if (walKey == null) {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendEmptyEdit(this.wal, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal);
+ }
+
+ // now start my own transaction
+ writeEntry = walKey.getWriteEntry();
+
+
+ // Actually write to Memstore now
+ if (!tempMemstore.isEmpty()) {
+ for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
+ Store store = entry.getKey();
+ if (store.getFamily().getMaxVersions() == 1) {
+ // upsert if VERSIONS for this CF == 1
+ // Is this right? It immediately becomes visible? St.Ack 20150907
+ size += store.upsert(entry.getValue(), getSmallestReadPoint());
+ } else {
+ // otherwise keep older versions around
+ for (Cell cell: entry.getValue()) {
+ CellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
+ Pair<Long, Cell> ret = store.add(cell);
+ size += ret.getFirst();
+ doRollBackMemstore = true;
+ }
+ }
+ // We add to all KVs here whereas when doing increment, we do it
+ // earlier... why?
+ allKVs.addAll(entry.getValue());
+ }
+
+ size = this.addAndGetGlobalMemstoreSize(size);
+ flush = isFlushSize(size);
}
- size = this.addAndGetGlobalMemstoreSize(size);
- flush = isFlushSize(size);
} finally {
this.updatesLock.readLock().unlock();
}
+
} finally {
rowLock.release();
rowLock = null;
@@ -7065,13 +7183,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
- rollbackMemstore(memstoreCells);
- if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
+ rollbackMemstore(allKVs);
+ if (writeEntry != null) mvcc.complete(writeEntry);
} else if (writeEntry != null) {
- mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+ mvcc.completeAndWait(writeEntry);
}
- closeRegionOperation(Operation.APPEND);
+ closeRegionOperation(op);
}
if (this.metricsRegion != null) {
@@ -7083,8 +7201,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
requestFlush();
}
-
- return append.isReturnResults() ? Result.create(allKVs) : null;
+ return mutate.isReturnResults() ? Result.create(allKVs) : null;
}
public Result increment(Increment increment) throws IOException {
@@ -7095,89 +7212,73 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// We should refactor append and increment as local get-mutate-put
// transactions, so all stores only go through one code path for puts.
+ // They are subtley different in quiet a few ways. This came out only
+ // after study. I am not sure that many of the differences are intentional.
+ // TODO: St.Ack 20150907
+
@Override
- public Result increment(Increment increment, long nonceGroup, long nonce)
+ public Result increment(Increment mutation, long nonceGroup, long nonce)
throws IOException {
- byte [] row = increment.getRow();
- checkRow(row, "increment");
- TimeRange tr = increment.getTimeRange();
+ Operation op = Operation.INCREMENT;
+ byte [] row = mutation.getRow();
+ checkRow(row, op.toString());
boolean flush = false;
- Durability durability = getEffectiveDurability(increment.getDurability());
+ Durability durability = getEffectiveDurability(mutation.getDurability());
boolean writeToWAL = durability != Durability.SKIP_WAL;
WALEdit walEdits = null;
- List<Cell> allKVs = new ArrayList<Cell>(increment.size());
+ List<Cell> allKVs = new ArrayList<Cell>(mutation.size());
+
Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
-
long size = 0;
long txid = 0;
-
checkReadOnly();
checkResources();
// Lock row
- startRegionOperation(Operation.INCREMENT);
+ startRegionOperation(op);
this.writeRequestsCount.increment();
RowLock rowLock = null;
- WriteEntry writeEntry = null;
WALKey walKey = null;
- long mvccNum = 0;
- List<Cell> memstoreCells = new ArrayList<Cell>();
+ MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
boolean doRollBackMemstore = false;
+ TimeRange tr = mutation.getTimeRange();
try {
rowLock = getRowLock(row);
+ assert rowLock != null;
try {
lock(this.updatesLock.readLock());
try {
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
- mvcc.waitForPreviousTransactionsComplete();
+ mvcc.await();
if (this.coprocessorHost != null) {
- Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
+ Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation);
if (r != null) {
return r;
}
}
- // now start my own transaction
- mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
- writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
long now = EnvironmentEdgeManager.currentTime();
// Process each family
- for (Map.Entry<byte [], List<Cell>> family:
- increment.getFamilyCellMap().entrySet()) {
-
+ for (Map.Entry<byte [], List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
Store store = stores.get(family.getKey());
List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
- // Sort the cells so that they match the order that they
- // appear in the Get results. Otherwise, we won't be able to
- // find the existing values if the cells are not specified
- // in order by the client since cells are in an array list.
- Collections.sort(family.getValue(), store.getComparator());
- // Get previous values for all columns in this family
- Get get = new Get(row);
- for (Cell cell: family.getValue()) {
- get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
- }
- get.setTimeRange(tr.getMin(), tr.getMax());
- List<Cell> results = get(get, false);
+ List<Cell> results = doGet(store, row, family, tr);
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount
+
+ // Avoid as much copying as possible. We may need to rewrite and
+ // consolidate tags. Bytes are only copied once.
+ // Would be nice if KeyValue had scatter/gather logic
int idx = 0;
+ // HERE WE DIVERGE FROM APPEND
List<Cell> edits = family.getValue();
for (int i = 0; i < edits.size(); i++) {
Cell cell = edits.get(i);
long amount = Bytes.toLong(CellUtil.cloneValue(cell));
boolean noWriteBack = (amount == 0);
- List<Tag> newTags = new ArrayList<Tag>();
-
- // Carry forward any tags that might have been added by a coprocessor
- if (cell.getTagsLength() > 0) {
- Iterator<Tag> itr = CellUtil.tagsIterator(cell.getTagsArray(),
- cell.getTagsOffset(), cell.getTagsLength());
- while (itr.hasNext()) {
- newTags.add(itr.next());
- }
- }
+
+ List<Tag> newTags = carryForwardTags(cell, new ArrayList<Tag>());
Cell c = null;
long ts = now;
@@ -7192,15 +7293,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
"Attempted to increment field that isn't 64 bits wide");
}
// Carry tags forward from previous version
- if (c.getTagsLength() > 0) {
- Iterator<Tag> itr = CellUtil.tagsIterator(c.getTagsArray(),
- c.getTagsOffset(), c.getTagsLength());
- while (itr.hasNext()) {
- newTags.add(itr.next());
- }
- }
- if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1)))
+ newTags = carryForwardTags(c, newTags);
+ if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) {
idx++;
+ }
}
// Append new incremented KeyValue to list
@@ -7208,8 +7304,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
byte[] val = Bytes.toBytes(amount);
// Add the TTL tag if the mutation carried one
- if (increment.getTTL() != Long.MAX_VALUE) {
- newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
+ if (mutation.getTTL() != Long.MAX_VALUE) {
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL())));
}
Cell newKV = new KeyValue(row, 0, row.length,
@@ -7220,12 +7316,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
val, 0, val.length,
newTags);
- CellUtil.setSequenceId(newKV, mvccNum);
-
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = coprocessorHost.postMutationBeforeWAL(
- RegionObserver.MutationType.INCREMENT, increment, c, newKV);
+ RegionObserver.MutationType.INCREMENT, mutation, c, newKV);
}
allKVs.add(newKV);
@@ -7248,20 +7342,47 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- //Actually write to Memstore now
+ // Actually write to WAL now
+ if (walEdits != null && !walEdits.isEmpty()) {
+ if (writeToWAL) {
+ // Using default cluster id, as this can only happen in the originating
+ // cluster. A slave cluster receives the final value (not the delta)
+ // as a Put.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(),
+ WALKey.NO_SEQUENCE_ID,
+ nonceGroup,
+ nonce,
+ mvcc);
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
+ walKey, walEdits, true);
+ } else {
+ recordMutationWithoutWal(mutation.getFamilyCellMap());
+ }
+ }
+ if (walKey == null) {
+ // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
+ walKey = this.appendEmptyEdit(this.wal);
+ }
+
+ // now start my own transaction
+ writeEntry = walKey.getWriteEntry();
+
+ // Actually write to Memstore now
if (!tempMemstore.isEmpty()) {
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
if (store.getFamily().getMaxVersions() == 1) {
// upsert if VERSIONS for this CF == 1
+ // Is this right? It immediately becomes visible? St.Ack 20150907
size += store.upsert(entry.getValue(), getSmallestReadPoint());
- memstoreCells.addAll(entry.getValue());
} else {
// otherwise keep older versions around
for (Cell cell : entry.getValue()) {
+ CellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
Pair<Long, Cell> ret = store.add(cell);
size += ret.getFirst();
- memstoreCells.add(ret.getSecond());
doRollBackMemstore = true;
}
}
@@ -7269,26 +7390,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
}
-
- // Actually write to WAL now
- if (walEdits != null && !walEdits.isEmpty()) {
- if (writeToWAL) {
- // Using default cluster id, as this can only happen in the originating
- // cluster. A slave cluster receives the final value (not the delta)
- // as a Put.
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
- walKey, walEdits, getSequenceId(), true, memstoreCells);
- } else {
- recordMutationWithoutWal(increment.getFamilyCellMap());
- }
- }
- if(walKey == null){
- // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
- walKey = this.appendEmptyEdit(this.wal, memstoreCells);
- }
} finally {
this.updatesLock.readLock().unlock();
}
@@ -7307,10 +7408,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
- rollbackMemstore(memstoreCells);
- if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
+ for(List<Cell> cells: tempMemstore.values()) {
+ rollbackMemstore(cells);
+ }
+ if (writeEntry != null) mvcc.complete(writeEntry);
} else if (writeEntry != null) {
- mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+ mvcc.completeAndWait(writeEntry);
}
closeRegionOperation(Operation.INCREMENT);
if (this.metricsRegion != null) {
@@ -7322,7 +7425,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Request a cache flush. Do it outside update lock.
requestFlush();
}
- return increment.isReturnResults() ? Result.create(allKVs) : null;
+ return mutation.isReturnResults() ? Result.create(allKVs) : null;
}
//
@@ -7341,7 +7444,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+ 43 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
@@ -7487,7 +7590,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw new IOException("Not a known catalog table: " + p.toString());
}
try {
- region.initialize(null);
+ region.mvcc.advanceTo(region.initialize(null));
if (majorCompact) {
region.compact(true);
} else {
@@ -7905,110 +8008,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
- * Do not change this sequence id. See {@link #sequenceId} comment.
+ * Do not change this sequence id.
* @return sequenceId
*/
@VisibleForTesting
- public AtomicLong getSequenceId() {
- return this.sequenceId;
- }
-
- /**
- * sets this region's sequenceId.
- * @param value new value
- */
- private void setSequenceId(long value) {
- this.sequenceId.set(value);
- }
-
- @VisibleForTesting class RowLockContext {
- private final HashedBytes row;
- private final CountDownLatch latch = new CountDownLatch(1);
- private final Thread thread;
- private int lockCount = 0;
-
- RowLockContext(HashedBytes row) {
- this.row = row;
- this.thread = Thread.currentThread();
- }
-
- boolean ownedByCurrentThread() {
- return thread == Thread.currentThread();
- }
-
- RowLock newLock() {
- lockCount++;
- RowLockImpl rl = new RowLockImpl();
- rl.setContext(this);
- return rl;
- }
-
- @Override
- public String toString() {
- Thread t = this.thread;
- return "Thread=" + (t == null? "null": t.getName()) + ", row=" + this.row +
- ", lockCount=" + this.lockCount;
- }
-
- void releaseLock() {
- if (!ownedByCurrentThread()) {
- throw new IllegalArgumentException("Lock held by thread: " + thread
- + " cannot be released by different thread: " + Thread.currentThread());
- }
- lockCount--;
- if (lockCount == 0) {
- // no remaining locks by the thread, unlock and allow other threads to access
- RowLockContext existingContext = lockedRows.remove(row);
- if (existingContext != this) {
- throw new RuntimeException(
- "Internal row lock state inconsistent, should not happen, row: " + row);
- }
- latch.countDown();
- }
- }
+ public long getSequenceId() {
+ return this.mvcc.getReadPoint();
}
- public static class RowLockImpl implements RowLock {
- private RowLockContext context;
- private boolean released = false;
-
- @VisibleForTesting
- public RowLockContext getContext() {
- return context;
- }
-
- @VisibleForTesting
- public void setContext(RowLockContext context) {
- this.context = context;
- }
-
- @Override
- public void release() {
- if (!released) {
- context.releaseLock();
- }
- released = true;
- }
- }
/**
* Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
* the WALEdit append later.
* @param wal
- * @param cells list of Cells inserted into memstore. Those Cells are passed in order to
- * be updated with right mvcc values(their wal sequence number)
* @return Return the key used appending with no sync and no append.
* @throws IOException
*/
- private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
+ private WALKey appendEmptyEdit(final WAL wal) throws IOException {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
@SuppressWarnings("deprecation")
- WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
- WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
+ getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
+ HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
+
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated
// with any edit and we can be sure it went in after all outstanding appends.
- wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, getSequenceId(), false,
- cells);
+ wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false);
return key;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 1341397..cfda1c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -651,7 +651,7 @@ public class HStore implements Store {
// readers might pick it up. This assumes that the store is not getting any writes (otherwise
// in-flight transactions might be made visible)
if (!toBeAddedFiles.isEmpty()) {
- region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId());
+ region.getMVCC().advanceTo(this.getMaxSequenceId());
}
// notify scanners, close file readers, and recompute store size
@@ -1308,7 +1308,7 @@ public class HStore implements Store {
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
- this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
+ this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
}
@VisibleForTesting