You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/09 00:14:10 UTC
[30/32] hbase git commit: HBASE-15158 Change order in which we do
write pipeline operations; do all under row locks
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec92a8a7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f03c205..ac846b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -29,6 +28,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -41,6 +41,7 @@ import java.util.NavigableSet;
import java.util.RandomAccess;
import java.util.Set;
import java.util.TreeMap;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@@ -69,7 +70,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellScanner;
@@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
@@ -93,7 +92,6 @@ import org.apache.hadoop.hbase.ShareableMemory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagRewriteCell;
-import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
@@ -112,7 +110,7 @@ import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
@@ -123,8 +121,6 @@ import org.apache.hadoop.hbase.filter.FilterWrapper;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
@@ -148,6 +144,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -169,7 +166,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -199,6 +195,7 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.TextFormat;
+@SuppressWarnings("deprecation")
@InterfaceAudience.Private
public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
private static final Log LOG = LogFactory.getLog(HRegion.class);
@@ -207,18 +204,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
"hbase.hregion.scan.loadColumnFamiliesOnDemand";
/**
- * Longest time we'll wait on a sequenceid.
- * Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use
- * it without cleaning up previous usage properly; generally, a WAL roll is needed. The timeout
- * is for a latch in WALKey. There is no global accounting of outstanding WALKeys; intentionally
- * to avoid contention, but it makes it so if an abort or problem, we could be stuck waiting
- * on the WALKey latch. Revisit.
- */
- private final int maxWaitForSeqId;
- private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
- private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000;
-
- /**
* This is the global default value for durability. All tables/mutations not
* defining a durability or using USE_DEFAULT will default to this value.
*/
@@ -282,7 +267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final Counter checkAndMutateChecksPassed = new Counter();
final Counter checkAndMutateChecksFailed = new Counter();
- //Number of requests
+ // Number of requests
final Counter readRequestsCount = new Counter();
final Counter filteredReadRequestsCount = new Counter();
final Counter writeRequestsCount = new Counter();
@@ -357,7 +342,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
private boolean disallowWritesInRecovering = false;
- // when a region is in recovering state, it can only accept writes not reads
+ // When a region is in recovering state, it can only accept writes not reads
private volatile boolean recovering = false;
private volatile Optional<ConfigurationManager> configurationManager;
@@ -374,7 +359,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// We achieve this by synchronizing on the scannerReadPoints object.
synchronized(scannerReadPoints) {
minimumReadPoint = mvcc.getReadPoint();
-
for (Long readPoint: this.scannerReadPoints.values()) {
if (readPoint < minimumReadPoint) {
minimumReadPoint = readPoint;
@@ -674,7 +658,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
- this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID);
this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
this.htableDescriptor = htd;
this.rsServices = rsServices;
@@ -1183,7 +1166,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public void setRecovering(boolean newState) {
boolean wasRecovering = this.recovering;
- // before we flip the recovering switch (enabling reads) we should write the region open
+ // Before we flip the recovering switch (enabling reads) we should write the region open
// event to WAL if needed
if (wal != null && getRegionServerServices() != null && !writestate.readOnly
&& wasRecovering && !newState) {
@@ -2051,7 +2034,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Should the store be flushed because it is old enough.
* <p>
- * Every FlushPolicy should call this to determine whether a store is old enough to flush(except
+ * Every FlushPolicy should call this to determine whether a store is old enough to flush (except
* that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
* returns true which will make a lot of flush requests.
*/
@@ -2152,19 +2135,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* for say installing a bulk loaded file just ahead of the last hfile that was
* the result of this flush, etc.
*
- * @param wal
- * Null if we're NOT to go via wal.
- * @param myseqid
- * The seqid to use if <code>wal</code> is null writing out flush
- * file.
- * @param storesToFlush
- * The list of stores to flush.
+ * @param wal Null if we're NOT to go via wal.
+ * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
+ * @param storesToFlush The list of stores to flush.
* @return object describing the flush's state
- * @throws IOException
- * general io exceptions
- * @throws DroppedSnapshotException
- * Thrown when replay of wal is required because a Snapshot was not
- * properly persisted.
+ * @throws IOException general io exceptions
+ * @throws DroppedSnapshotException Thrown when replay of WAL is required.
*/
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
@@ -2188,65 +2164,48 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw new IOException("Aborting flush because server is aborted...");
}
final long startTime = EnvironmentEdgeManager.currentTime();
- // If nothing to flush, return, but we need to safely update the region sequence id
+ // If nothing to flush, return, but return with a valid unused sequenceId.
+ // Its needed by bulk upload IIRC. It flushes until no edits in memory so it can insert a
+ // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs
+ // to no other that it can use to associate with the bulk load. Hence this little dance below
+ // to go get one.
if (this.memstoreSize.get() <= 0) {
- // Take an update lock because am about to change the sequence id and we want the sequence id
- // to be at the border of the empty memstore.
- MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
+ // Take an update lock so no edits can come into memory just yet.
this.updatesLock.writeLock().lock();
+ WriteEntry writeEntry = null;
try {
if (this.memstoreSize.get() <= 0) {
// Presume that if there are still no edits in the memstore, then there are no edits for
// this region out in the WAL subsystem so no need to do any trickery clearing out
- // edits in the WAL system. Up the sequence number so the resulting flush id is for
- // sure just beyond the last appended region edit (useful as a marker when bulk loading,
- // etc.). NOTE: The writeEntry write number is NOT in the WAL.. there is no WAL writing
- // here.
+ // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
+ // sure just beyond the last appended region edit and not associated with any edit
+ // (useful as marker when bulk loading, etc.).
+ FlushResult flushResult = null;
if (wal != null) {
writeEntry = mvcc.begin();
long flushOpSeqId = writeEntry.getWriteNumber();
- FlushResult flushResult = new FlushResultImpl(
- FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
- flushOpSeqId,
- "Nothing to flush",
- writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
- // TODO: Lets see if we hang here, if there is a scenario where an outstanding reader
- // with a read point is in advance of this write point.
+ flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+ flushOpSeqId, "Nothing to flush",
+ writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
mvcc.completeAndWait(writeEntry);
+ // Set to null so we don't complete it again down in finally block.
writeEntry = null;
return new PrepareFlushResult(flushResult, myseqid);
} else {
- return new PrepareFlushResult(
- new FlushResultImpl(
- FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
- "Nothing to flush",
- false),
- myseqid);
+ return new PrepareFlushResult(new FlushResultImpl(
+ FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush", false), myseqid);
}
}
} finally {
- this.updatesLock.writeLock().unlock();
if (writeEntry != null) {
+ // If writeEntry is non-null, this operation failed; the mvcc transaction failed...
+ // but complete it anyways so it doesn't block the mvcc queue.
mvcc.complete(writeEntry);
}
+ this.updatesLock.writeLock().unlock();
}
}
-
- if (LOG.isInfoEnabled()) {
- // Log a fat line detailing what is being flushed.
- StringBuilder perCfExtras = null;
- if (!isAllFamilies(storesToFlush)) {
- perCfExtras = new StringBuilder();
- for (Store store: storesToFlush) {
- perCfExtras.append("; ").append(store.getColumnFamilyName());
- perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
- }
- }
- LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
- " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
- ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
- ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid));
- }
+ logFatLineOnFlush(storesToFlush, myseqid);
// Stop updates while we snapshot the memstore of all of these regions' stores. We only have
// to do this for a moment. It is quick. We also set the memstore size to zero here before we
// allow updates again so its value will represent the size of the updates received
@@ -2257,8 +2216,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Obtaining lock to block concurrent updates");
// block waiting for the lock for internal flush
this.updatesLock.writeLock().lock();
- status.setStatus("Preparing to flush by snapshotting stores in " +
- getRegionInfo().getEncodedName());
+ status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
long totalFlushableSizeOfFlushableStores = 0;
Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
@@ -2280,109 +2238,117 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// will be in advance of this sequence id.
long flushedSeqId = HConstants.NO_SEQNUM;
byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
-
- long trxId = 0;
- MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin();
try {
- try {
- if (wal != null) {
- Long earliestUnflushedSequenceIdForTheRegion =
+ if (wal != null) {
+ Long earliestUnflushedSequenceIdForTheRegion =
wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
- if (earliestUnflushedSequenceIdForTheRegion == null) {
- // This should never happen. This is how startCacheFlush signals flush cannot proceed.
- String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
- status.setStatus(msg);
- return new PrepareFlushResult(
+ if (earliestUnflushedSequenceIdForTheRegion == null) {
+ // This should never happen. This is how startCacheFlush signals flush cannot proceed.
+ String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
+ status.setStatus(msg);
+ return new PrepareFlushResult(
new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
myseqid);
- }
- flushOpSeqId = getNextSequenceId(wal);
- // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit
- flushedSeqId =
- earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
- flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
- } else {
- // use the provided sequence Id as WAL is not being used for this flush.
- flushedSeqId = flushOpSeqId = myseqid;
}
+ flushOpSeqId = getNextSequenceId(wal);
+ // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit
+ flushedSeqId =
+ earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
+ flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
+ } else {
+ // use the provided sequence Id as WAL is not being used for this flush.
+ flushedSeqId = flushOpSeqId = myseqid;
+ }
- for (Store s : storesToFlush) {
- totalFlushableSizeOfFlushableStores += s.getFlushableSize();
- storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
- committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
- storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
- }
+ for (Store s : storesToFlush) {
+ totalFlushableSizeOfFlushableStores += s.getFlushableSize();
+ storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
+ committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
+ storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
+ }
- // write the snapshot start to WAL
- if (wal != null && !writestate.readOnly) {
- FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
+ // write the snapshot start to WAL
+ if (wal != null && !writestate.readOnly) {
+ FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- // no sync. Sync is below where we do not hold the updates lock
- trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, false, mvcc);
- }
-
- // Prepare flush (take a snapshot)
- for (StoreFlushContext flush : storeFlushCtxs.values()) {
- flush.prepare();
- }
- } catch (IOException ex) {
- if (wal != null) {
- if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
- try {
- FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
- getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, false, mvcc);
- } catch (Throwable t) {
- LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
- StringUtils.stringifyException(t));
- // ignore this since we will be aborting the RS with DSE.
- }
- }
- // we have called wal.startCacheFlush(), now we have to abort it
- wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
- throw ex; // let upper layers deal with it.
- }
- } finally {
- this.updatesLock.writeLock().unlock();
- }
- String s = "Finished memstore snapshotting " + this +
- ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
- status.setStatus(s);
- if (LOG.isTraceEnabled()) LOG.trace(s);
- // sync unflushed WAL changes
- // see HBASE-8208 for details
- if (wal != null) {
- try {
- wal.sync(); // ensure that flush marker is sync'ed
- } catch (IOException ioe) {
- wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
- throw ioe;
- }
+ // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
}
- // wait for all in-progress transactions to commit to WAL before
- // we can start the flush. This prevents
- // uncommitted transactions from being written into HFiles.
- // We have to block before we start the flush, otherwise keys that
- // were removed via a rollbackMemstore could be written to Hfiles.
- mvcc.completeAndWait(writeEntry);
- // set writeEntry to null to prevent mvcc.complete from being called again inside finally
- // block
- writeEntry = null;
- } finally {
- if (writeEntry != null) {
- // In case of failure just mark current writeEntry as complete.
- mvcc.complete(writeEntry);
+ // Prepare flush (take a snapshot)
+ for (StoreFlushContext flush : storeFlushCtxs.values()) {
+ flush.prepare();
}
+ } catch (IOException ex) {
+ doAbortFlushToWAL(wal, flushOpSeqId, committedFiles);
+ throw ex;
+ } finally {
+ this.updatesLock.writeLock().unlock();
}
+ String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, " +
+ "flushsize=" + totalFlushableSizeOfFlushableStores;
+ status.setStatus(s);
+ doSyncOfUnflushedWALChanges(wal, getRegionInfo());
return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
}
/**
- * @param families
+ * Utility method broken out of internalPrepareFlushCache so that method is smaller.
+ */
+ private void logFatLineOnFlush(final Collection<Store> storesToFlush, final long sequenceId) {
+ if (!LOG.isInfoEnabled()) {
+ return;
+ }
+ // Log a fat line detailing what is being flushed.
+ StringBuilder perCfExtras = null;
+ if (!isAllFamilies(storesToFlush)) {
+ perCfExtras = new StringBuilder();
+ for (Store store: storesToFlush) {
+ perCfExtras.append("; ").append(store.getColumnFamilyName());
+ perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
+ }
+ }
+ LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
+ " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
+ ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
+ ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId));
+ }
+
+ private void doAbortFlushToWAL(final WAL wal, final long flushOpSeqId,
+ final Map<byte[], List<Path>> committedFiles) {
+ if (wal == null) return;
+ try {
+ FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
+ getRegionInfo(), flushOpSeqId, committedFiles);
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false,
+ mvcc);
+ } catch (Throwable t) {
+ LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
+ StringUtils.stringifyException(t));
+ // ignore this since we will be aborting the RS with DSE.
+ }
+ // we have called wal.startCacheFlush(), now we have to abort it
+ wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
+ }
+
+ /**
+ * Sync unflushed WAL changes. See HBASE-8208 for details
+ */
+ private static void doSyncOfUnflushedWALChanges(final WAL wal, final HRegionInfo hri)
+ throws IOException {
+ if (wal == null) {
+ return;
+ }
+ try {
+ wal.sync(); // ensure that flush marker is sync'ed
+ } catch (IOException ioe) {
+ wal.abortCacheFlush(hri.getEncodedNameAsBytes());
+ throw ioe;
+ }
+ }
+
+ /**
* @return True if passed Set is all families in the region.
*/
private boolean isAllFamilies(final Collection<Store> families) {
@@ -2400,8 +2366,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
try {
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, true, mvcc);
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -2471,8 +2436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, true, mvcc);
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
@@ -2485,8 +2449,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, false, mvcc);
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
} catch (Throwable ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "failed writing ABORT_FLUSH marker to WAL", ex);
@@ -2557,15 +2520,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
@VisibleForTesting
protected long getNextSequenceId(final WAL wal) throws IOException {
- // TODO: For review. Putting an empty edit in to get a sequenceid out will not work if the
- // WAL is banjaxed... if it has gotten an exception and the WAL has not yet been rolled or
- // aborted. In this case, we'll just get stuck here. For now, until HBASE-12751, just have
- // a timeout. May happen in tests after we tightened the semantic via HBASE-14317.
- // Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches
- // so if an abort or stop, there is no way to call them in.
- WALKey key = this.appendEmptyEdit(wal);
- mvcc.complete(key.getWriteEntry());
- return key.getSequenceId(this.maxWaitForSeqId);
+ WriteEntry we = mvcc.begin();
+ mvcc.completeAndWait(we);
+ return we.getWriteNumber();
}
//////////////////////////////////////////////////////////////////////////////
@@ -2754,13 +2711,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* accumulating status codes and tracking the index at which processing
* is proceeding.
*/
- private abstract static class BatchOperationInProgress<T> {
+ private abstract static class BatchOperation<T> {
T[] operations;
int nextIndexToProcess = 0;
OperationStatus[] retCodeDetails;
WALEdit[] walEditsFromCoprocessors;
- public BatchOperationInProgress(T[] operations) {
+ public BatchOperation(T[] operations) {
this.operations = operations;
this.retCodeDetails = new OperationStatus[operations.length];
this.walEditsFromCoprocessors = new WALEdit[operations.length];
@@ -2780,7 +2737,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- private static class MutationBatch extends BatchOperationInProgress<Mutation> {
+ private static class MutationBatch extends BatchOperation<Mutation> {
private long nonceGroup;
private long nonce;
public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
@@ -2820,7 +2777,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
+ private static class ReplayBatch extends BatchOperation<MutationReplay> {
private long replaySeqId = 0;
public ReplayBatch(MutationReplay[] operations, long seqId) {
super(operations);
@@ -2906,7 +2863,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
- OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
+ OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException {
boolean initialized = false;
Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
startRegionOperation(op);
@@ -2920,11 +2877,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!initialized) {
this.writeRequestsCount.add(batchOp.operations.length);
if (!batchOp.isInReplay()) {
- doPreMutationHook(batchOp);
+ doPreBatchMutateHook(batchOp);
}
initialized = true;
}
- long addedSize = doMiniBatchMutation(batchOp);
+ long addedSize = doMiniBatchMutate(batchOp);
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
if (isFlushSize(newSize)) {
requestFlush();
@@ -2936,8 +2893,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return batchOp.retCodeDetails;
}
-
- private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
+ private void doPreBatchMutateHook(BatchOperation<?> batchOp)
throws IOException {
/* Run coprocessor pre hook outside of locks to avoid deadlock */
WALEdit walEdit = new WALEdit();
@@ -2976,103 +2932,58 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ /**
+ * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)}
+ * In here we also handle replay of edits on region recover.
+ * @return Change in size brought about by applying <code>batchOp</code>
+ */
@SuppressWarnings("unchecked")
- private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
- boolean isInReplay = batchOp.isInReplay();
- // variable to note if all Put items are for the same CF -- metrics related
+ // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120
+ private long doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
+ boolean replay = batchOp.isInReplay();
+ // Variable to note if all Put items are for the same CF -- metrics related
boolean putsCfSetConsistent = true;
- //The set of columnFamilies first seen for Put.
- Set<byte[]> putsCfSet = null;
- // variable to note if all Delete items are for the same CF -- metrics related
+ // Variable to note if all Delete items are for the same CF -- metrics related
boolean deletesCfSetConsistent = true;
- //The set of columnFamilies first seen for Delete.
+ // The set of columnFamilies first seen for Put.
+ Set<byte[]> putsCfSet = null;
+ // The set of columnFamilies first seen for Delete.
Set<byte[]> deletesCfSet = null;
-
- long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
- WALEdit walEdit = new WALEdit(isInReplay);
- MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
- long txid = 0;
- boolean doRollBackMemstore = false;
+ long currentNonceGroup = HConstants.NO_NONCE;
+ long currentNonce = HConstants.NO_NONCE;
+ WALEdit walEdit = new WALEdit(replay);
boolean locked = false;
-
- /** Keep track of the locks we hold so we can release them in finally clause */
- List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired
Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
boolean success = false;
- int noOfPuts = 0, noOfDeletes = 0;
- WALKey walKey = null;
- long mvccNum = 0;
+ int noOfPuts = 0;
+ int noOfDeletes = 0;
+ WriteEntry writeEntry = null;
+ /** Keep track of the locks we hold so we can release them in finally clause */
+ List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
try {
- // ------------------------------------
- // STEP 1. Try to acquire as many locks as we can, and ensure
- // we acquire at least one.
- // ----------------------------------
+ // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
int numReadyToWrite = 0;
long now = EnvironmentEdgeManager.currentTime();
while (lastIndexExclusive < batchOp.operations.length) {
- Mutation mutation = batchOp.getMutation(lastIndexExclusive);
- boolean isPutMutation = mutation instanceof Put;
-
- Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
- // store the family map reference to allow for mutations
- familyMaps[lastIndexExclusive] = familyMap;
-
- // skip anything that "ran" already
- if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
- != OperationStatusCode.NOT_RUN) {
- lastIndexExclusive++;
- continue;
- }
-
- try {
- if (isPutMutation) {
- // Check the families in the put. If bad, skip this one.
- if (isInReplay) {
- removeNonExistentColumnFamilyForReplay(familyMap);
- } else {
- checkFamilies(familyMap.keySet());
- }
- checkTimestamps(mutation.getFamilyCellMap(), now);
- } else {
- prepareDelete((Delete) mutation);
- }
- checkRow(mutation.getRow(), "doMiniBatchMutation");
- } catch (NoSuchColumnFamilyException nscf) {
- LOG.warn("No such column family in batch mutation", nscf);
- batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
- OperationStatusCode.BAD_FAMILY, nscf.getMessage());
- lastIndexExclusive++;
- continue;
- } catch (FailedSanityCheckException fsce) {
- LOG.warn("Batch Mutation did not pass sanity check", fsce);
- batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
- OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
- lastIndexExclusive++;
- continue;
- } catch (WrongRegionException we) {
- LOG.warn("Batch mutation had a row that does not belong to this region", we);
- batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
- OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
+ if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now)) {
lastIndexExclusive++;
continue;
}
-
- // If we haven't got any rows in our batch, we should block to
- // get the next one.
+ Mutation mutation = batchOp.getMutation(lastIndexExclusive);
+ // If we haven't got any rows in our batch, we should block to get the next one.
RowLock rowLock = null;
try {
rowLock = getRowLock(mutation.getRow(), true);
} catch (IOException ioe) {
- LOG.warn("Failed getting lock in batch put, row="
- + Bytes.toStringBinary(mutation.getRow()), ioe);
+ LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
}
if (rowLock == null) {
// We failed to grab another lock
- break; // stop acquiring more rows for this batch
+ break; // Stop acquiring more rows for this batch
} else {
acquiredRowLocks.add(rowLock);
}
@@ -3080,9 +2991,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
lastIndexExclusive++;
numReadyToWrite++;
- if (isPutMutation) {
+ if (mutation instanceof Put) {
// If Column Families stay consistent through out all of the
- // individual puts then metrics can be reported as a mutliput across
+ // individual puts then metrics can be reported as a multiput across
// column families in the first put.
if (putsCfSet == null) {
putsCfSet = mutation.getFamilyCellMap().keySet();
@@ -3100,23 +3011,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- // we should record the timestamp only after we have acquired the rowLock,
+ // We've now grabbed as many mutations off the list as we can
+
+ // STEP 2. Update any LATEST_TIMESTAMP timestamps
+ // We should record the timestamp only after we have acquired the rowLock,
// otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
now = EnvironmentEdgeManager.currentTime();
byte[] byteNow = Bytes.toBytes(now);
// Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
- if (numReadyToWrite <= 0) return 0L;
-
- // We've now grabbed as many mutations off the list as we can
+ if (numReadyToWrite <= 0) {
+ return 0L;
+ }
- // ------------------------------------
- // STEP 2. Update any LATEST_TIMESTAMP timestamps
- // ----------------------------------
- for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
+ for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) {
// skip invalid
if (batchOp.retCodeDetails[i].getOperationStatusCode()
- != OperationStatusCode.NOT_RUN) continue;
+ != OperationStatusCode.NOT_RUN) {
+ // lastIndexExclusive was incremented above.
+ continue;
+ }
Mutation mutation = batchOp.getMutation(i);
if (mutation instanceof Put) {
@@ -3133,16 +3047,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
locked = true;
// calling the pre CP hook for batch mutation
- if (!isInReplay && coprocessorHost != null) {
+ if (!replay && coprocessorHost != null) {
MiniBatchOperationInProgress<Mutation> miniBatchOp =
new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
- if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
+ if (coprocessorHost.preBatchMutate(miniBatchOp)) {
+ return 0L;
+ }
}
- // ------------------------------------
// STEP 3. Build WAL edit
- // ----------------------------------
Durability durability = Durability.USE_DEFAULT;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing
@@ -3160,26 +3074,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
continue;
}
- long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
+ long nonceGroup = batchOp.getNonceGroup(i);
+ long nonce = batchOp.getNonce(i);
// In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
// Given how nonces are originally written, these should be contiguous.
// They don't have to be, it will still work, just write more WALEdits than needed.
if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
- if (walEdit.size() > 0) {
- assert isInReplay;
- if (!isInReplay) {
- throw new IOException("Multiple nonces per batch and not in replay");
- }
- // txid should always increase, so having the one from the last call is ok.
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), now, m.getClusterIds(),
- currentNonceGroup, currentNonce, mvcc);
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
- walEdit, true);
- walEdit = new WALEdit(isInReplay);
- walKey = null;
- }
+ // Write what we have so far for nonces out to WAL
+ appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce);
+ walEdit = new WALEdit(replay);
currentNonceGroup = nonceGroup;
currentNonce = nonce;
}
@@ -3194,107 +3097,83 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
addFamilyMapToWALEdit(familyMaps[i], walEdit);
}
- // -------------------------
- // STEP 4. Append the final edit to WAL. Do not sync wal.
- // -------------------------
+ // STEP 4. Append the final edit to WAL and sync.
Mutation mutation = batchOp.getMutation(firstIndex);
- if (isInReplay) {
+ WALKey walKey = null;
+ if (replay) {
// use wal key from the original
walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
- long replaySeqId = batchOp.getReplaySequenceId();
- walKey.setOrigLogSeqNum(replaySeqId);
- }
- if (walEdit.size() > 0) {
- if (!isInReplay) {
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
+ }
+ // Not sure what is going on here when replay is going on... does the below append get
+ // called for replayed edits? Am afraid to change it without test.
+ if (!walEdit.isEmpty()) {
+ if (!replay) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
}
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ // TODO: Use the doAppend methods below... complicated by the replay stuff above.
+ try {
+ long txid =
+ this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ if (txid != 0) sync(txid, durability);
+ writeEntry = walKey.getWriteEntry();
+ } catch (IOException ioe) {
+ if (walKey != null) mvcc.complete(walKey.getWriteEntry());
+ throw ioe;
+ }
}
- // ------------------------------------
- // Acquire the latest mvcc number
- // ----------------------------------
if (walKey == null) {
- // If this is a skip wal operation just get the read point from mvcc
- walKey = this.appendEmptyEdit(this.wal);
- }
- if (!isInReplay) {
- writeEntry = walKey.getWriteEntry();
- mvccNum = writeEntry.getWriteNumber();
- } else {
- mvccNum = batchOp.getReplaySequenceId();
+ // If no walKey, then skipping WAL or some such. Being an mvcc transaction so sequenceid.
+ writeEntry = mvcc.begin();
}
- // ------------------------------------
// STEP 5. Write back to memstore
- // Write to memstore. It is ok to write to memstore
- // first without syncing the WAL because we do not roll
- // forward the memstore MVCC. The MVCC will be moved up when
- // the complete operation is done. These changes are not yet
- // visible to scanners till we update the MVCC. The MVCC is
- // moved only when the sync is complete.
- // ----------------------------------
long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
- if (batchOp.retCodeDetails[i].getOperationStatusCode()
- != OperationStatusCode.NOT_RUN) {
+ if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
continue;
}
- doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
- addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay);
+ addedSize += applyFamilyMapToMemstore(familyMaps[i], replay,
+ replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
+ }
+
+ // STEP 6. Complete mvcc.
+ if (replay) {
+ this.mvcc.advanceTo(batchOp.getReplaySequenceId());
+ } else if (writeEntry != null/*Can be null if in replay mode*/) {
+ mvcc.completeAndWait(writeEntry);
+ writeEntry = null;
}
- // -------------------------------
- // STEP 6. Release row locks, etc.
- // -------------------------------
+ // STEP 7. Release row locks, etc.
if (locked) {
this.updatesLock.readLock().unlock();
locked = false;
}
releaseRowLocks(acquiredRowLocks);
- // -------------------------
- // STEP 7. Sync wal.
- // -------------------------
- if (txid != 0) {
- syncOrDefer(txid, durability);
- }
-
- doRollBackMemstore = false;
// calling the post CP hook for batch mutation
- if (!isInReplay && coprocessorHost != null) {
+ if (!replay && coprocessorHost != null) {
MiniBatchOperationInProgress<Mutation> miniBatchOp =
new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
coprocessorHost.postBatchMutate(miniBatchOp);
}
- // ------------------------------------------------------------------
- // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
- // ------------------------------------------------------------------
- if (writeEntry != null) {
- mvcc.completeAndWait(writeEntry);
- writeEntry = null;
- } else if (isInReplay) {
- // ensure that the sequence id of the region is at least as big as orig log seq id
- mvcc.advanceTo(mvccNum);
- }
-
for (int i = firstIndex; i < lastIndexExclusive; i ++) {
if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
}
}
- // ------------------------------------
- // STEP 9. Run coprocessor post hooks. This should be done after the wal is
+ // STEP 8. Run coprocessor post hooks. This should be done after the wal is
// synced so that the coprocessor contract is adhered to.
- // ------------------------------------
- if (!isInReplay && coprocessorHost != null) {
+ if (!replay && coprocessorHost != null) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// only for successful puts
if (batchOp.retCodeDetails[i].getOperationStatusCode()
@@ -3313,18 +3192,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
success = true;
return addedSize;
} finally {
- // if the wal sync was unsuccessful, remove keys from memstore
- if (doRollBackMemstore) {
- for (int j = 0; j < familyMaps.length; j++) {
- for(List<Cell> cells:familyMaps[j].values()) {
- rollbackMemstore(cells);
- }
- }
- if (writeEntry != null) mvcc.complete(writeEntry);
- } else if (writeEntry != null) {
- mvcc.completeAndWait(writeEntry);
- }
-
+ // Call complete rather than completeAndWait because we probably had error if walKey != null
+ if (writeEntry != null) mvcc.complete(writeEntry);
if (locked) {
this.updatesLock.readLock().unlock();
}
@@ -3369,6 +3238,88 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ private void appendCurrentNonces(final Mutation mutation, final boolean replay,
+ final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce)
+ throws IOException {
+ if (walEdit.isEmpty()) return;
+ if (!replay) throw new IOException("Multiple nonces per batch and not in replay");
+ WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
+ currentNonceGroup, currentNonce, mvcc);
+ this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ // Complete the mvcc transaction started down in append else it will block others
+ this.mvcc.complete(walKey.getWriteEntry());
+ }
+
+ private boolean checkBatchOp(BatchOperation<?> batchOp, final int lastIndexExclusive,
+ final Map<byte[], List<Cell>>[] familyMaps, final long now)
+ throws IOException {
+ boolean skip = false;
+ // Skip anything that "ran" already
+ if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
+ != OperationStatusCode.NOT_RUN) {
+ return true;
+ }
+ Mutation mutation = batchOp.getMutation(lastIndexExclusive);
+ Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
+ // store the family map reference to allow for mutations
+ familyMaps[lastIndexExclusive] = familyMap;
+
+ try {
+ if (mutation instanceof Put) {
+ // Check the families in the put. If bad, skip this one.
+ if (batchOp.isInReplay()) {
+ removeNonExistentColumnFamilyForReplay(familyMap);
+ } else {
+ checkFamilies(familyMap.keySet());
+ }
+ checkTimestamps(mutation.getFamilyCellMap(), now);
+ } else {
+ prepareDelete((Delete)mutation);
+ }
+ checkRow(mutation.getRow(), "doMiniBatchMutation");
+ } catch (NoSuchColumnFamilyException nscf) {
+ LOG.warn("No such column family in batch mutation", nscf);
+ batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
+ OperationStatusCode.BAD_FAMILY, nscf.getMessage());
+ skip = true;
+ } catch (FailedSanityCheckException fsce) {
+ LOG.warn("Batch Mutation did not pass sanity check", fsce);
+ batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
+ OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
+ skip = true;
+ } catch (WrongRegionException we) {
+ LOG.warn("Batch mutation had a row that does not belong to this region", we);
+ batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
+ OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
+ skip = true;
+ }
+ return skip;
+ }
+
+ /**
+ * During replay, there could exist column families which are removed between region server
+ * failure and replay
+ */
+ private void removeNonExistentColumnFamilyForReplay(final Map<byte[], List<Cell>> familyMap) {
+ List<byte[]> nonExistentList = null;
+ for (byte[] family : familyMap.keySet()) {
+ if (!this.htableDescriptor.hasFamily(family)) {
+ if (nonExistentList == null) {
+ nonExistentList = new ArrayList<byte[]>();
+ }
+ nonExistentList.add(family);
+ }
+ }
+ if (nonExistentList != null) {
+ for (byte[] family : nonExistentList) {
+ // Perhaps schema was changed between crash and replay
+ LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
+ familyMap.remove(family);
+ }
+ }
+ }
+
/**
* Returns effective durability from the passed durability and
* the table descriptor.
@@ -3377,93 +3328,82 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return d == Durability.USE_DEFAULT ? this.durability : d;
}
- //TODO, Think that gets/puts and deletes should be refactored a bit so that
- //the getting of the lock happens before, so that you would just pass it into
- //the methods. So in the case of checkAndMutate you could just do lockRow,
- //get, put, unlockRow or something
-
@Override
public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
- CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
+ CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation,
boolean writeToWAL)
throws IOException{
+ checkMutationType(mutation, row);
+ return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, null,
+ mutation, writeToWAL);
+ }
+
+ @Override
+ public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
+ CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
+ boolean writeToWAL)
+ throws IOException {
+ return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, rm, null,
+ writeToWAL);
+ }
+
+ /**
+ * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has
+ * switches in the few places where there is deviation.
+ */
+ private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
+ CompareOp compareOp, ByteArrayComparable comparator, RowMutations rowMutations,
+ Mutation mutation, boolean writeToWAL)
+ throws IOException {
+ // Could do the below checks but seems wacky with two callers only. Just comment out for now.
+ // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't
+ // need these commented out checks.
+ // if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null");
+ // if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set");
checkReadOnly();
- //TODO, add check for value length or maybe even better move this to the
- //client if this becomes a global setting
+ // TODO, add check for value length also move this check to the client
checkResources();
- boolean isPut = w instanceof Put;
- if (!isPut && !(w instanceof Delete))
- throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
- "be Put or Delete");
- if (!Bytes.equals(row, w.getRow())) {
- throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
- "getRow must match the passed row");
- }
-
startRegionOperation();
try {
Get get = new Get(row);
checkFamily(family);
get.addColumn(family, qualifier);
-
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow());
- // wait for all previous transactions to complete (with lock held)
- mvcc.await();
try {
- if (this.getCoprocessorHost() != null) {
+ if (mutation != null && this.getCoprocessorHost() != null) {
+ // Call coprocessor.
Boolean processed = null;
- if (w instanceof Put) {
+ if (mutation instanceof Put) {
processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
- qualifier, compareOp, comparator, (Put) w);
- } else if (w instanceof Delete) {
+ qualifier, compareOp, comparator, (Put)mutation);
+ } else if (mutation instanceof Delete) {
processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
- qualifier, compareOp, comparator, (Delete) w);
+ qualifier, compareOp, comparator, (Delete)mutation);
}
if (processed != null) {
return processed;
}
}
+ // NOTE: We used to wait here until mvcc caught up: mvcc.await();
+ // Supposition is that now all changes are done under row locks, then when we go to read,
+ // we'll get the latest on this row.
List<Cell> result = get(get, false);
-
- boolean valueIsNull = comparator.getValue() == null ||
- comparator.getValue().length == 0;
+ boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
boolean matches = false;
long cellTs = 0;
if (result.size() == 0 && valueIsNull) {
matches = true;
- } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
- valueIsNull) {
+ } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
matches = true;
cellTs = result.get(0).getTimestamp();
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
cellTs = kv.getTimestamp();
int compareResult = CellComparator.compareValue(kv, comparator);
- switch (compareOp) {
- case LESS:
- matches = compareResult < 0;
- break;
- case LESS_OR_EQUAL:
- matches = compareResult <= 0;
- break;
- case EQUAL:
- matches = compareResult == 0;
- break;
- case NOT_EQUAL:
- matches = compareResult != 0;
- break;
- case GREATER_OR_EQUAL:
- matches = compareResult >= 0;
- break;
- case GREATER:
- matches = compareResult > 0;
- break;
- default:
- throw new RuntimeException("Unknown Compare op " + compareOp.name());
- }
+ matches = matches(compareOp, compareResult);
}
- //If matches put the new put or delete the new delete
+ // If matches put the new put or delete the new delete
if (matches) {
// We have acquired the row lock already. If the system clock is NOT monotonically
// non-decreasing (see HBASE-14070) we should make sure that the mutation has a
@@ -3472,16 +3412,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long now = EnvironmentEdgeManager.currentTime();
long ts = Math.max(now, cellTs); // ensure write is not eclipsed
byte[] byteTs = Bytes.toBytes(ts);
-
- if (w instanceof Put) {
- updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
+ if (mutation != null) {
+ if (mutation instanceof Put) {
+ updateCellTimestamps(mutation.getFamilyCellMap().values(), byteTs);
+ }
+ // And else 'delete' is not needed since it already does a second get, and sets the
+ // timestamp from get (see prepareDeleteTimestamps).
+ } else {
+ for (Mutation m: rowMutations.getMutations()) {
+ if (m instanceof Put) {
+ updateCellTimestamps(m.getFamilyCellMap().values(), byteTs);
+ }
+ }
+ // And else 'delete' is not needed since it already does a second get, and sets the
+ // timestamp from get (see prepareDeleteTimestamps).
+ }
+ // All edits for the given row (across all column families) must happen atomically.
+ if (mutation != null) {
+ doBatchMutate(mutation);
+ } else {
+ mutateRow(rowMutations);
}
- // else delete is not needed since it already does a second get, and sets the timestamp
- // from get (see prepareDeleteTimestamps).
-
- // All edits for the given row (across all column families) must
- // happen atomically.
- doBatchMutate(w);
this.checkAndMutateChecksPassed.increment();
return true;
}
@@ -3495,113 +3446,54 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- //TODO, Think that gets/puts and deletes should be refactored a bit so that
- //the getting of the lock happens before, so that you would just pass it into
- //the methods. So in the case of checkAndMutate you could just do lockRow,
- //get, put, unlockRow or something
+ private void checkMutationType(final Mutation mutation, final byte [] row)
+ throws DoNotRetryIOException {
+ boolean isPut = mutation instanceof Put;
+ if (!isPut && !(mutation instanceof Delete)) {
+ throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must be Put or Delete");
+ }
+ if (!Bytes.equals(row, mutation.getRow())) {
+ throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's getRow must match");
+ }
+ }
- @Override
- public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
- CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
- boolean writeToWAL) throws IOException {
- checkReadOnly();
- //TODO, add check for value length or maybe even better move this to the
- //client if this becomes a global setting
- checkResources();
+ private boolean matches(final CompareOp compareOp, final int compareResult) {
+ boolean matches = false;
+ switch (compareOp) {
+ case LESS:
+ matches = compareResult < 0;
+ break;
+ case LESS_OR_EQUAL:
+ matches = compareResult <= 0;
+ break;
+ case EQUAL:
+ matches = compareResult == 0;
+ break;
+ case NOT_EQUAL:
+ matches = compareResult != 0;
+ break;
+ case GREATER_OR_EQUAL:
+ matches = compareResult >= 0;
+ break;
+ case GREATER:
+ matches = compareResult > 0;
+ break;
+ default:
+ throw new RuntimeException("Unknown Compare op " + compareOp.name());
+ }
+ return matches;
+ }
- startRegionOperation();
- try {
- Get get = new Get(row);
- checkFamily(family);
- get.addColumn(family, qualifier);
- // Lock row - note that doBatchMutate will relock this row if called
- RowLock rowLock = getRowLock(get.getRow());
- // wait for all previous transactions to complete (with lock held)
- mvcc.await();
- try {
- List<Cell> result = get(get, false);
-
- boolean valueIsNull = comparator.getValue() == null ||
- comparator.getValue().length == 0;
- boolean matches = false;
- long cellTs = 0;
- if (result.size() == 0 && valueIsNull) {
- matches = true;
- } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
- valueIsNull) {
- matches = true;
- cellTs = result.get(0).getTimestamp();
- } else if (result.size() == 1 && !valueIsNull) {
- Cell kv = result.get(0);
- cellTs = kv.getTimestamp();
- int compareResult = CellComparator.compareValue(kv, comparator);
- switch (compareOp) {
- case LESS:
- matches = compareResult < 0;
- break;
- case LESS_OR_EQUAL:
- matches = compareResult <= 0;
- break;
- case EQUAL:
- matches = compareResult == 0;
- break;
- case NOT_EQUAL:
- matches = compareResult != 0;
- break;
- case GREATER_OR_EQUAL:
- matches = compareResult >= 0;
- break;
- case GREATER:
- matches = compareResult > 0;
- break;
- default:
- throw new RuntimeException("Unknown Compare op " + compareOp.name());
- }
- }
- //If matches put the new put or delete the new delete
- if (matches) {
- // We have acquired the row lock already. If the system clock is NOT monotonically
- // non-decreasing (see HBASE-14070) we should make sure that the mutation has a
- // larger timestamp than what was observed via Get. doBatchMutate already does this, but
- // there is no way to pass the cellTs. See HBASE-14054.
- long now = EnvironmentEdgeManager.currentTime();
- long ts = Math.max(now, cellTs); // ensure write is not eclipsed
- byte[] byteTs = Bytes.toBytes(ts);
-
- for (Mutation w : rm.getMutations()) {
- if (w instanceof Put) {
- updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
- }
- // else delete is not needed since it already does a second get, and sets the timestamp
- // from get (see prepareDeleteTimestamps).
- }
-
- // All edits for the given row (across all column families) must
- // happen atomically.
- mutateRow(rm);
- this.checkAndMutateChecksPassed.increment();
- return true;
- }
- this.checkAndMutateChecksFailed.increment();
- return false;
- } finally {
- rowLock.release();
- }
- } finally {
- closeRegionOperation();
- }
- }
-
- private void doBatchMutate(Mutation mutation) throws IOException {
- // Currently this is only called for puts and deletes, so no nonces.
- OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
- if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
- throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
- } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
- throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
- }
- }
+ private void doBatchMutate(Mutation mutation) throws IOException {
+ // Currently this is only called for puts and deletes, so no nonces.
+ OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
+ if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
+ throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
+ } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
+ throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
+ }
+ }
/**
* Complete taking the snapshot on the region. Writes the region info and adds references to the
@@ -3663,40 +3555,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
// Check if we have any work to do and early out otherwise
// Update these checks as more logic is added here
-
if (m.getTTL() == Long.MAX_VALUE) {
return;
}
// From this point we know we have some work to do
-
for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
List<Cell> cells = e.getValue();
assert cells instanceof RandomAccess;
int listSize = cells.size();
for (int i = 0; i < listSize; i++) {
Cell cell = cells.get(i);
- List<Tag> newTags = new ArrayList<Tag>();
- Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell);
-
- // Carry forward existing tags
-
- while (tagIterator.hasNext()) {
-
- // Add any filters or tag specific rewrites here
-
- newTags.add(tagIterator.next());
- }
-
- // Cell TTL handling
-
- // Check again if we need to add a cell TTL because early out logic
- // above may change when there are more tag based features in core.
- if (m.getTTL() != Long.MAX_VALUE) {
- // Add a cell TTL tag
- newTags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
- }
-
+ List<Tag> newTags = TagUtil.carryForwardTags(null, cell);
+ newTags = TagUtil.carryForwardTTLTag(newTags, m.getTTL());
// Rewrite the cell with the updated set of tags
cells.set(i, new TagRewriteCell(cell, TagUtil.fromList(newTags)));
}
@@ -3772,49 +3643,64 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* should already have locked updatesLock.readLock(). This also does
* <b>not</b> check the families for validity.
*
- * @param familyMap Map of kvs per family
- * @param mvccNum The MVCC for this transaction.
- * @param isInReplay true when adding replayed KVs into memstore
- * @return the additional memory usage of the memstore caused by the
- * new entries.
+ * @param familyMap Map of Cells by family
+ * @return the additional memory usage of the memstore caused by the new entries.
*/
- private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
- long mvccNum, boolean isInReplay) throws IOException {
+ private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, boolean replay,
+ long sequenceId)
+ throws IOException {
long size = 0;
-
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<Cell> cells = e.getValue();
assert cells instanceof RandomAccess;
- Store store = getStore(family);
- int listSize = cells.size();
- for (int i=0; i < listSize; i++) {
+ size += applyToMemstore(getStore(family), cells, false, replay, sequenceId);
+ }
+ return size;
+ }
+
+ /**
+ * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
+ * set; when set we will run operations that make sense in the increment/append scenario but
+ * that do not make sense otherwise.
+ * @return Memstore change in size on insert of these Cells.
+ * @see #applyToMemstore(Store, Cell, long)
+ */
+ private long applyToMemstore(final Store store, final List<Cell> cells,
+ final boolean delta, boolean replay, long sequenceId)
+ throws IOException {
+ // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
+ long size = 0;
+ boolean upsert = delta && store.getFamily().getMaxVersions() == 1;
+ int count = cells.size();
+ if (upsert) {
+ size += store.upsert(cells, getSmallestReadPoint());
+ } else {
+ for (int i = 0; i < count; i++) {
Cell cell = cells.get(i);
- if (cell.getSequenceId() == 0 || isInReplay) {
- CellUtil.setSequenceId(cell, mvccNum);
+ // TODO: This looks wrong.. checking for sequenceid of zero is expensive!!!!! St.Ack
+ // When is it zero anyways? When replay? Then just rely on that flag.
+ if (cell.getSequenceId() == 0 || replay) {
+ CellUtil.setSequenceId(cell, sequenceId);
}
size += store.add(cell);
}
}
-
- return size;
- }
+ return size;
+ }
/**
- * Remove all the keys listed in the map from the memstore. This method is
- * called when a Put/Delete has updated memstore but subsequently fails to update
- * the wal. This method is then invoked to rollback the memstore.
+ * @return Memstore change in size on insert of these Cells.
+ * @see #applyToMemstore(Store, List, boolean, boolean, long)
*/
- private void rollbackMemstore(List<Cell> memstoreCells) {
- int kvsRolledback = 0;
-
- for (Cell cell : memstoreCells) {
- byte[] family = CellUtil.cloneFamily(cell);
- Store store = getStore(family);
- store.rollback(cell);
- kvsRolledback++;
+ private long applyToMemstore(final Store store, final Cell cell, long sequenceId)
+ throws IOException {
+ // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
+ if (store == null) {
+ checkFamily(CellUtil.cloneFamily(cell));
+ // Unreachable because checkFamily will throw exception
}
- LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
+ return store.add(cell);
}
@Override
@@ -3824,30 +3710,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- /**
- * During replay, there could exist column families which are removed between region server
- * failure and replay
- */
- private void removeNonExistentColumnFamilyForReplay(
- final Map<byte[], List<Cell>> familyMap) {
- List<byte[]> nonExistentList = null;
- for (byte[] family : familyMap.keySet()) {
- if (!this.htableDescriptor.hasFamily(family)) {
- if (nonExistentList == null) {
- nonExistentList = new ArrayList<byte[]>();
- }
- nonExistentList.add(family);
- }
- }
- if (nonExistentList != null) {
- for (byte[] family : nonExistentList) {
- // Perhaps schema was changed between crash and replay
- LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
- familyMap.remove(family);
- }
- }
- }
-
@Override
public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now)
throws FailedSanityCheckException {
@@ -5490,12 +5352,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return true;
} finally {
if (wal != null && !storeFiles.isEmpty()) {
- // write a bulk load event when not all hfiles are loaded
+ // @rite a bulk load event when not all hfiles are loaded
try {
WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
this.getRegionInfo().getTable(),
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
- WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
+ WALUtil.writeBulkLoadMarkerAndSync(this.wal, getTableDesc(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
if (this.rsServices != null) {
@@ -5593,7 +5455,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// getSmallestReadPoint, before scannerReadPoints is updated.
IsolationLevel isolationLevel = scan.getIsolationLevel();
synchronized(scannerReadPoints) {
- this.readPt = getReadpoint(isolationLevel);
+ this.readPt = getReadPoint(isolationLevel);
scannerReadPoints.put(this, this.readPt);
}
@@ -5758,7 +5620,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// As the data is obtained from two independent heaps, we need to
// ensure that result list is sorted, because Result relies on that.
- Collections.sort(results, comparator);
+ sort(results, comparator);
return moreValues;
}
@@ -6876,7 +6738,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
long nonceGroup, long nonce) throws IOException {
-
for (byte[] row : processor.getRowsToLock()) {
checkRow(row, "processRowsWithLocks");
}
@@ -6884,23 +6745,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
checkReadOnly();
}
checkResources();
-
startRegionOperation();
WALEdit walEdit = new WALEdit();
- // 1. Run pre-process hook
- try {
- processor.preProcess(this, walEdit);
- } catch (IOException e) {
- closeRegionOperation();
- throw e;
- }
+ // STEP 1. Run pre-process hook
+ preProcess(processor, walEdit);
// Short circuit the read only case
if (processor.readOnly()) {
try {
long now = EnvironmentEdgeManager.currentTime();
- doProcessRowWithTimeout(
- processor, now, this, null, null, timeout);
+ doProcessRowWithTimeout(processor, now, this, null, null, timeout);
processor.postProcess(this, walEdit, true);
} finally {
closeRegionOperation();
@@ -6908,118 +6762,81 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return;
}
- MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
boolean locked;
- boolean walSyncSuccessful = false;
List<RowLock> acquiredRowLocks;
long addedSize = 0;
List<Mutation> mutations = new ArrayList<Mutation>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
- long mvccNum = 0;
- WALKey walKey = null;
+ // This is assigned by mvcc either explicity in the below or in the guts of the WAL append
+ // when it assigns the edit a sequencedid (A.K.A the mvcc write number).
+ WriteEntry writeEntry = null;
try {
- // 2. Acquire the row lock(s)
+ // STEP 2. Acquire the row lock(s)
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out
// use a writer lock for mixed reads and writes
acquiredRowLocks.add(getRowLock(row));
}
- // 3. Region lock
+ // STEP 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
locked = true;
-
+ boolean success = false;
long now = EnvironmentEdgeManager.currentTime();
try {
- // 4. Let the processor scan the rows, generate mutations and add
- // waledits
- doProcessRowWithTimeout(
- processor, now, this, mutations, walEdit, timeout);
-
+ // STEP 4. Let the processor scan the rows, generate mutations and add waledits
+ doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
if (!mutations.isEmpty()) {
-
- // 5. Call the preBatchMutate hook
+ // STEP 5. Call the preBatchMutate hook
processor.preBatchMutate(this, walEdit);
- long txid = 0;
- // 6. Append no sync
+ // STEP 6. Append and sync if walEdit has data to write out.
if (!walEdit.isEmpty()) {
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
- processor.getClusterIds(), nonceGroup, nonce, mvcc);
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
- walKey, walEdit, false);
- }
- if(walKey == null){
- // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
- // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
- walKey = this.appendEmptyEdit(this.wal);
+ writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()),
+ processor.getClusterIds(), now, nonceGroup, nonce);
+ } else {
+ // We are here if WAL is being skipped.
+ writeEntry = this.mvcc.begin();
}
- // 7. Start mvcc transaction
- writeEntry = walKey.getWriteEntry();
- mvccNum = walKey.getSequenceId();
-
-
-
- // 8. Apply to memstore
+ // STEP 7. Apply to memstore
+ long sequenceId = writeEntry.getWriteNumber();
for (Mutation m : mutations) {
- // Handle any tag based cell features
+ // Handle any tag based cell features.
+ // TODO: Do we need to call rewriteCellTags down in applyToMemstore()? Why not before
+ // so tags go into WAL?
rewriteCellTags(m.getFamilyCellMap(), m);
-
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
Cell cell = cellScanner.current();
- CellUtil.setSequenceId(cell, mvccNum);
- Store store = getStore(cell);
- if (store == null) {
- checkFamily(CellUtil.cloneFamily(cell));
- // unreachable
+ if (walEdit.isEmpty()) {
+ // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id.
+ // If no WAL, need to stamp it here.
+ CellUtil.setSequenceId(cell, sequenceId);
}
- addedSize += store.add(cell);
+ Store store = getStore(cell);
+ addedSize += applyToMemstore(store, cell, sequenceId);
}
}
+ // STEP 8. Complete mvcc.
+ mvcc.completeAndWait(writeEntry);
+ writeEntry = null;
- // 9. Release region lock
+ // STEP 9. Release region lock
if (locked) {
this.updatesLock.readLock().unlock();
locked = false;
}
- // 10. Release row lock(s)
+ // STEP 10. Release row lock(s)
releaseRowLocks(acquiredRowLocks);
- // 11. Sync edit log
- if (txid != 0) {
- syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
- }
- walSyncSuccessful = true;
- // 12. call postBatchMutate hook
+ // STEP 11. call postBatchMutate hook
processor.postBatchMutate(this);
}
+ success = true;
} finally {
- // TODO: Make this method look like all other methods that are doing append/sync and
- // memstore rollback such as append and doMiniBatchMutation. Currently it is a little
- // different. Make them all share same code!
- if (!mutations.isEmpty() && !walSyncSuccessful) {
- LOG.warn("Wal sync failed. Roll back " + mutations.size() +
- " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
- processor.getRowsToLock().iterator().next()) + "...");
- for (Mutation m : mutations) {
- for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
- Cell cell = cellScanner.current();
- getStore(cell).rollback(cell);
- }
- }
- if (writeEntry != null) {
- mvcc.complete(writeEntry);
- writeEntry = null;
- }
- }
- // 13. Roll mvcc forward
- if (writeEntry != null) {
- mvcc.completeAndWait(writeEntry);
- }
+ // Call complete rather than completeAndWait because we probably had error if walKey != null
+ if (writeEntry != null) mvcc.complete(writeEntry);
if (locked) {
this.updatesLock.readLock().unlock();
}
@@ -7027,18 +6844,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
releaseRowLocks(acquiredRowLocks);
}
- // 14. Run post-process hook
- processor.postProcess(this, walEdit, walSyncSuccessful);
-
+ // 12. Run post-process hook
+ processor.postProcess(this, walEdit, success);
} finally {
closeRegionOperation();
- if (!mutations.isEmpty() &&
- isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
+ if (!mutations.isEmpty() && isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
requestFlush();
}
}
}
+ private void preProcess(final RowProcessor<?,?> processor, final WALEdit walEdit)
+ throws IOException {
+ try {
+ processor.preProcess(this, walEdit);
+ } catch (IOException e) {
+ closeRegionOperation();
+ throw e;
+ }
+ }
+
private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
final long now,
final HRegion region,
@@ -7089,500 +6914,400 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- /**
- * @return The passed-in {@code tags} but with the tags from {@code cell} added.
- */
- private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {
- if (cell.getTagsLength() <= 0) return tags;
- List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags;
- Iterator<Tag> i = CellUtil.tagsIterator(cell);
- while (i.hasNext()) newTags.add(i.next());
- return newTags;
+ public Result append(Append append) throws IOException {
+ return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
- /**
- * Run a Get against passed in <code>store</code> on passed <code>row</code>, etc.
- * @return Get result.
- */
- private List<Cell> doGet(final Store store, final byte [] row,
- final Map.Entry<byte[], List<Cell>> family, final TimeRange tr)
- throws IOException {
- // Sort the cells so that they match the order that they
- // appear in the Get results. Otherwise, we won't be able to
- // find the existing values if the cells are not specified
- // in order by the client since cells are in an array list.
- Collections.sort(family.getValue(), store.getComparator());
- // Get previous values for all columns in this family
- Get get = new Get(row);
- for (Cell cell : family.getValue()) {
- get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
- }
- if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax());
- return get(get, false);
+ @Override
+ public Result append(Append mutation, long nonceGroup, long nonce) throws IOException {
+ return doDelta(Operation.APPEND, mutation, nonceGroup, nonce, mutation.isReturnResults());
}
- public Result append(Append append) throws IOException {
- return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ public Result increment(Increment increment) throws IOException {
+ return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
- // TODO: There's a lot of boiler plate code identical to increment.
- // We should refactor append and increment as local get-mutate-put
- // transactions, so all stores only go through one code path for puts.
-
@Override
- public Result append(Append mutate, long nonceGroup, long nonce) throws IOException {
- Operation op = Operation.APPEND;
- byte[] row = mutate.getRow();
- checkRow(row, op.toString());
- checkFamilies(mutate.getFamilyCellMap().keySet());
- boolean flush = false;
- Durability durability = getEffectiveDurability(mutate.getDurability());
- boolean writeToWAL = durability != Durability.SKIP_WAL;
- WALEdit walEdits = null;
- List<Cell> allKVs = new ArrayList<Cell>(mutate.size());
- Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
- long size = 0;
- long txid = 0;
+ public Result increment(Increment mutation, long nonceGroup, long nonce)
+ throws IOException {
+ return doDelta(Operation.INCREMENT, mutation, nonceGroup, nonce, mutation.isReturnResults());
+ }
+
+ /**
+ * Add "deltas" to Cells. Deltas are increments or appends. Switch on <code>op</code>.
+ *
+ * <p>If increment, add deltas to current values or if an append, then
+ * append the deltas to the current Cell values.
+ *
+ * <p>Append and Increment code paths are mostly the same. They differ in just a few pl
<TRUNCATED>