You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/01/21 22:36:25 UTC
svn commit: r1436633 - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/util/
hbase-common/src/test/java/org/apache/hadoop/hbase/util/
hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/
hbase-server/src/main/java/org/apach...
Author: tedyu
Date: Mon Jan 21 21:36:25 2013
New Revision: 1436633
URL: http://svn.apache.org/viewvc?rev=1436633&view=rev
Log:
HBASE-7329 Revert due to compilation error
Removed:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java
hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java Mon Jan 21 21:36:25 2013
@@ -126,7 +126,7 @@ public class WALPlayer extends Configure
Delete del = null;
KeyValue lastKV = null;
for (KeyValue kv : value.getKeyValues()) {
- // filtering HLog meta entries
+ // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
// A WALEdit may contain multiple operations (HBASE-3584) and/or
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Jan 21 21:36:25 2013
@@ -1553,26 +1553,17 @@ public class HRegion implements HeapSize
long flushsize = this.memstoreSize.get();
status.setStatus("Preparing to flush by snapshotting stores");
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
- long flushSeqId = -1L;
+ long completeSeqId = -1L;
try {
// Record the mvcc for all transactions in progress.
w = mvcc.beginMemstoreInsert();
mvcc.advanceMemstore(w);
- if (wal != null) {
- Long startSeqId = wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
- if (startSeqId == null) {
- status.setStatus("Flush will not be started for [" + this.regionInfo.getEncodedName()
- + "] - WAL is going away");
- return false;
- }
- flushSeqId = startSeqId.longValue();
- } else {
- flushSeqId = myseqid;
- }
-
+ sequenceId = (wal == null)? myseqid:
+ wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
+ completeSeqId = this.getCompleteCacheFlushSequenceId(sequenceId);
for (Store s : stores.values()) {
- storeFlushers.add(s.getStoreFlusher(flushSeqId));
+ storeFlushers.add(s.getStoreFlusher(completeSeqId));
}
// prepare flush (take a snapshot)
@@ -1641,14 +1632,22 @@ public class HRegion implements HeapSize
throw dse;
}
- // If we get to here, the HStores have been written.
+ // If we get to here, the HStores have been written. If we get an
+ // error in completeCacheFlush it will release the lock it is holding
+
+ // B. Write a FLUSHCACHE-COMPLETE message to the log.
+ // This tells future readers that the HStores were emitted correctly,
+ // and that all updates to the log for this regionName that have lower
+ // log-sequence-ids can be safely ignored.
if (wal != null) {
- wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes());
+ wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(),
+ regionInfo.getTableName(), completeSeqId,
+ this.getRegionInfo().isMetaRegion());
}
// Update the last flushed sequence id for region
if (this.rsServices != null) {
- completeSequenceId = flushSeqId;
+ completeSequenceId = completeSeqId;
}
// C. Finally notify anyone waiting on memstore to clear:
@@ -1673,6 +1672,18 @@ public class HRegion implements HeapSize
return compactionRequested;
}
+ /**
+ * Get the sequence number to be associated with this cache flush. Used by
+ * TransactionalRegion to not complete pending transactions.
+ *
+ *
+ * @param currentSequenceId
+ * @return sequence id to complete the cache flush with
+ */
+ protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
+ return currentSequenceId;
+ }
+
//////////////////////////////////////////////////////////////////////////////
// get() methods for client use.
//////////////////////////////////////////////////////////////////////////////
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Mon Jan 21 21:36:25 2013
@@ -24,10 +24,8 @@ import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URLEncoder;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -59,7 +57,6 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.DrainBarrier;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
@@ -135,45 +132,23 @@ class FSHLog implements HLog, Syncable {
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{};
- /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
- private DrainBarrier closeBarrier = new DrainBarrier();
-
- /**
+ /*
* Current log file.
*/
Writer writer;
- /**
+ /*
* Map of all log files but the current one.
*/
final SortedMap<Long, Path> outputfiles =
Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
-
- /**
- * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums,
- * with the exception of append's putIfAbsent into oldestUnflushedSeqNums.
- * We only use these to find out the low bound seqNum, or to find regions with old seqNums to
- * force flush them, so we don't care about these numbers messing with anything. */
- private final Object oldestSeqNumsLock = new Object();
-
- /**
- * This lock makes sure only one log roll runs at the same time. Should not be taken while
- * any other lock is held. We don't just use synchronized because that results in bogus and
- * tedious findbugs warning when it thinks synchronized controls writer thread safety */
- private final Object rollWriterLock = new Object();
-
- /**
- * Map of encoded region names to their most recent sequence/edit id in their memstore.
+ /*
+ * Map of encoded region names to their most recent sequence/edit id in their
+ * memstore.
*/
- private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
+ private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
- /**
- * Map of encoded region names to their most recent sequence/edit id in their memstore;
- * contains the regions that are currently flushing. That way we can store two numbers for
- * flushing and non-flushing (oldestUnflushedSeqNums) memstore for the same region.
- */
- private final Map<byte[], Long> oldestFlushingSeqNums = new HashMap<byte[], Long>();
private volatile boolean closed = false;
@@ -203,6 +178,10 @@ class FSHLog implements HLog, Syncable {
// of the default Hdfs block size.
private final long logrollsize;
+ // This lock prevents starting a log roll during a cache flush.
+ // synchronized is insufficient because a cache flush spans two method calls.
+ private final Lock cacheFlushLock = new ReentrantLock();
+
// We synchronize on updateLock to prevent updates and to prevent a log roll
// during an update
// locked during appends
@@ -493,77 +472,88 @@ class FSHLog implements HLog, Syncable {
@Override
public byte [][] rollWriter(boolean force)
throws FailedLogCloseException, IOException {
- synchronized (rollWriterLock) {
- // Return if nothing to flush.
- if (!force && this.writer != null && this.numEntries.get() <= 0) {
- return null;
+ // Return if nothing to flush.
+ if (!force && this.writer != null && this.numEntries.get() <= 0) {
+ return null;
+ }
+ byte [][] regionsToFlush = null;
+ this.cacheFlushLock.lock();
+ try {
+ this.logRollRunning = true;
+ if (closed) {
+ LOG.debug("HLog closed. Skipping rolling of writer");
+ return regionsToFlush;
+ }
+ // Do all the preparation outside of the updateLock to block
+ // as less as possible the incoming writes
+ long currentFilenum = this.filenum;
+ Path oldPath = null;
+ if (currentFilenum > 0) {
+ //computeFilename will take care of meta hlog filename
+ oldPath = computeFilename(currentFilenum);
}
- byte [][] regionsToFlush = null;
- try {
- this.logRollRunning = true;
- boolean isClosed = closed;
- if (isClosed || !closeBarrier.beginOp()) {
- LOG.debug("HLog " + (isClosed ? "closed" : "closing") + ". Skipping rolling of writer");
- return regionsToFlush;
- }
- // Do all the preparation outside of the updateLock to block
- // as less as possible the incoming writes
- long currentFilenum = this.filenum;
- Path oldPath = null;
- if (currentFilenum > 0) {
- //computeFilename will take care of meta hlog filename
- oldPath = computeFilename(currentFilenum);
- }
- this.filenum = System.currentTimeMillis();
- Path newPath = computeFilename();
-
- // Tell our listeners that a new log is about to be created
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.preLogRoll(oldPath, newPath);
- }
+ this.filenum = System.currentTimeMillis();
+ Path newPath = computeFilename();
+
+ // Tell our listeners that a new log is about to be created
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.preLogRoll(oldPath, newPath);
}
- FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
- // Can we get at the dfsclient outputstream? If an instance of
- // SFLW, it'll have done the necessary reflection to get at the
- // protected field name.
- FSDataOutputStream nextHdfsOut = null;
- if (nextWriter instanceof SequenceFileLogWriter) {
- nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
- }
-
- Path oldFile = null;
- int oldNumEntries = 0;
- synchronized (updateLock) {
- // Clean up current writer.
- oldNumEntries = this.numEntries.get();
- oldFile = cleanupCurrentWriter(currentFilenum);
- this.writer = nextWriter;
- this.hdfs_out = nextHdfsOut;
- this.numEntries.set(0);
- }
- LOG.info("Rolled log" + (oldFile != null ? " for file=" + FSUtils.getPath(oldFile)
- + ", entries=" + oldNumEntries + ", filesize=" + this.fs.getFileStatus(oldFile).getLen()
- : "" ) + "; new path=" + FSUtils.getPath(newPath));
-
- // Tell our listeners that a new log was created
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.postLogRoll(oldPath, newPath);
- }
+ }
+ FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
+ // Can we get at the dfsclient outputstream? If an instance of
+ // SFLW, it'll have done the necessary reflection to get at the
+ // protected field name.
+ FSDataOutputStream nextHdfsOut = null;
+ if (nextWriter instanceof SequenceFileLogWriter) {
+ nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
+ }
+
+ synchronized (updateLock) {
+ // Clean up current writer.
+ Path oldFile = cleanupCurrentWriter(currentFilenum);
+ this.writer = nextWriter;
+ this.hdfs_out = nextHdfsOut;
+
+ LOG.info((oldFile != null?
+ "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
+ this.numEntries.get() +
+ ", filesize=" +
+ this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
+ " for " + FSUtils.getPath(newPath));
+ this.numEntries.set(0);
+ }
+ // Tell our listeners that a new log was created
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.postLogRoll(oldPath, newPath);
}
+ }
- // Can we delete any of the old log files?
- if (getNumLogFiles() > 0) {
- cleanOldLogs();
- regionsToFlush = getRegionsToForceFlush();
+ // Can we delete any of the old log files?
+ if (this.outputfiles.size() > 0) {
+ if (this.lastSeqWritten.isEmpty()) {
+ LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
+ // If so, then no new writes have come in since all regions were
+ // flushed (and removed from the lastSeqWritten map). Means can
+ // remove all but currently open log file.
+ for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
+ archiveLogFile(e.getValue(), e.getKey());
+ }
+ this.outputfiles.clear();
+ } else {
+ regionsToFlush = cleanOldLogs();
}
- } finally {
+ }
+ } finally {
+ try {
this.logRollRunning = false;
- closeBarrier.endOp();
+ } finally {
+ this.cacheFlushLock.unlock();
}
- return regionsToFlush;
}
+ return regionsToFlush;
}
/**
@@ -591,64 +581,36 @@ class FSHLog implements HLog, Syncable {
* encoded region names to flush.
* @throws IOException
*/
- private void cleanOldLogs() throws IOException {
- long oldestOutstandingSeqNum = Long.MAX_VALUE;
- synchronized (oldestSeqNumsLock) {
- Long oldestFlushing = (oldestFlushingSeqNums.size() > 0)
- ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE;
- Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0)
- ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE;
- oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed);
- }
-
+ private byte [][] cleanOldLogs() throws IOException {
+ Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
// Get the set of all log files whose last sequence number is smaller than
// the oldest edit's sequence number.
TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
oldestOutstandingSeqNum).keySet());
// Now remove old log files (if any)
- if (LOG.isDebugEnabled()) {
- if (sequenceNumbers.size() > 0) {
- LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" +
+ int logsToRemove = sequenceNumbers.size();
+ if (logsToRemove > 0) {
+ if (LOG.isDebugEnabled()) {
+ // Find associated region; helps debugging.
+ byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
+ LOG.debug("Found " + logsToRemove + " hlogs to remove" +
" out of total " + this.outputfiles.size() + ";" +
- " oldest outstanding sequenceid is " + oldestOutstandingSeqNum);
+ " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
+ " from region " + Bytes.toStringBinary(oldestRegion));
}
- }
- for (Long seq : sequenceNumbers) {
- archiveLogFile(this.outputfiles.remove(seq), seq);
- }
- }
-
- /**
- * Return regions that have edits that are equal or less than a certain sequence number.
- * Static due to some old unit test.
- * @param walSeqNum The sequence number to compare with.
- * @param regionsToSeqNums Encoded region names to sequence ids
- * @return All regions whose seqNum <= walSeqNum. Null if no regions found.
- */
- static byte[][] findMemstoresWithEditsEqualOrOlderThan(
- final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
- List<byte[]> regions = null;
- for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
- if (e.getValue().longValue() <= walSeqNum) {
- if (regions == null) regions = new ArrayList<byte[]>();
- regions.add(e.getKey());
+ for (Long seq : sequenceNumbers) {
+ archiveLogFile(this.outputfiles.remove(seq), seq);
}
}
- return regions == null ? null : regions
- .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
- }
- private byte[][] getRegionsToForceFlush() throws IOException {
// If too many log files, figure which regions we need to flush.
// Array is an array of encoded region names.
byte [][] regions = null;
- int logCount = getNumLogFiles();
+ int logCount = this.outputfiles.size();
if (logCount > this.maxLogs && logCount > 0) {
// This is an array of encoded region names.
- synchronized (oldestSeqNumsLock) {
- regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
- this.oldestUnflushedSeqNums);
- }
+ regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
+ this.lastSeqWritten);
if (regions != null) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < regions.length; i++) {
@@ -664,6 +626,29 @@ class FSHLog implements HLog, Syncable {
}
/*
+ * @return Logs older than this id are safe to remove.
+ */
+ private Long getOldestOutstandingSeqNum() {
+ return Collections.min(this.lastSeqWritten.values());
+ }
+
+ /**
+ * @param oldestOutstandingSeqNum
+ * @return (Encoded) name of oldest outstanding region.
+ */
+ private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
+ byte [] oldestRegion = null;
+ for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
+ if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
+ // Key is encoded region name.
+ oldestRegion = e.getKey();
+ break;
+ }
+ }
+ return oldestRegion;
+ }
+
+ /*
* Cleans up current writer closing and adding to outputfiles.
* Presumes we're operating inside an updateLock scope.
* @return Path to current writer or null if none.
@@ -795,39 +780,33 @@ class FSHLog implements HLog, Syncable {
@Override
public void close() throws IOException {
- if (this.closed) {
- return;
- }
try {
logSyncerThread.close();
// Make sure we synced everything
logSyncerThread.join(this.optionalFlushInterval*2);
} catch (InterruptedException e) {
LOG.error("Exception while waiting for syncer thread to die", e);
- Thread.currentThread().interrupt();
- }
- try {
- // Prevent all further flushing and rolling.
- closeBarrier.stopAndDrainOps();
- } catch (InterruptedException e) {
- LOG.error("Exception while waiting for cache flushes and log rolls", e);
- Thread.currentThread().interrupt();
}
- // Tell our listeners that the log is closing
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.logCloseRequested();
- }
- }
- synchronized (updateLock) {
- this.closed = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("closing hlog writer in " + this.dir.toString());
+ cacheFlushLock.lock();
+ try {
+ // Tell our listeners that the log is closing
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.logCloseRequested();
+ }
}
- if (this.writer != null) {
- this.writer.close();
+ synchronized (updateLock) {
+ this.closed = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing hlog writer in " + this.dir.toString());
+ }
+ if (this.writer != null) {
+ this.writer.close();
+ }
}
+ } finally {
+ cacheFlushLock.unlock();
}
}
@@ -859,7 +838,7 @@ class FSHLog implements HLog, Syncable {
// memstore). When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
- this.oldestUnflushedSeqNums.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
+ this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
Long.valueOf(seqNum));
doWrite(regionInfo, logKey, logEdit, htd);
txid = this.unflushedEntries.incrementAndGet();
@@ -931,7 +910,7 @@ class FSHLog implements HLog, Syncable {
// Use encoded name. Its shorter, guaranteed unique and a subset of
// actual name.
byte [] encodedRegionName = info.getEncodedNameAsBytes();
- this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
+ this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum);
HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
doWrite(info, logKey, edits, htd);
this.numEntries.incrementAndGet();
@@ -1063,11 +1042,7 @@ class FSHLog implements HLog, Syncable {
Writer tempWriter;
synchronized (this.updateLock) {
if (this.closed) return;
- // Guaranteed non-null.
- // Note that parallel sync can close tempWriter.
- // The current method of dealing with this is to catch exceptions.
- // See HBASE-4387, HBASE-5623, HBASE-7329.
- tempWriter = this.writer;
+ tempWriter = this.writer; // guaranteed non-null
}
// if the transaction that we are interested in is already
// synced, then return immediately.
@@ -1103,11 +1078,9 @@ class FSHLog implements HLog, Syncable {
}
try {
tempWriter.sync();
- } catch(IOException ex) {
+ } catch(IOException io) {
synchronized (this.updateLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
- // TODO: we don't actually need to do it for concurrent close - what is the point
- // of syncing new unrelated writer? Keep behavior for now.
tempWriter = this.writer;
tempWriter.sync();
}
@@ -1115,9 +1088,6 @@ class FSHLog implements HLog, Syncable {
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
- // TODO: preserving the old behavior for now, but this check is strange. It's not
- // protected by any locks here, so for all we know rolling locks might start
- // as soon as we enter the "if". Is this best-effort optimization check?
if (!this.logRollRunning) {
checkLowReplication();
try {
@@ -1280,61 +1250,107 @@ class FSHLog implements HLog, Syncable {
return outputfiles.size();
}
- @Override
- public Long startCacheFlush(final byte[] encodedRegionName) {
- Long oldRegionSeqNum = null;
- if (!closeBarrier.beginOp()) {
- return null;
+ private byte[] getSnapshotName(byte[] encodedRegionName) {
+ byte snp[] = new byte[encodedRegionName.length + 3];
+ // an encoded region name has only hex digits. s, n or p are not hex
+ // and therefore snapshot-names will never collide with
+ // encoded-region-names
+ snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p';
+ for (int i = 0; i < encodedRegionName.length; i++) {
+ snp[i+3] = encodedRegionName[i];
}
- synchronized (oldestSeqNumsLock) {
- oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
- if (oldRegionSeqNum != null) {
- Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
- assert oldValue == null : "Flushing map not cleaned up for "
- + Bytes.toString(encodedRegionName);
- }
- }
- if (oldRegionSeqNum == null) {
- // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
- // the region is already flushing (which would make this call invalid), or there
- // were no appends after last flush, so why are we starting flush? Maybe we should
- // assert not null, and switch to "long" everywhere. Less rigorous, but safer,
- // alternative is telling the caller to stop. For now preserve old logic.
- LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
- + Bytes.toString(encodedRegionName) + "]");
+ return snp;
+ }
+
+ @Override
+ public long startCacheFlush(final byte[] encodedRegionName) {
+ this.cacheFlushLock.lock();
+ Long seq = this.lastSeqWritten.remove(encodedRegionName);
+ // seq is the lsn of the oldest edit associated with this region. If a
+ // snapshot already exists - because the last flush failed - then seq will
+ // be the lsn of the oldest edit in the snapshot
+ if (seq != null) {
+ // keeping the earliest sequence number of the snapshot in
+ // lastSeqWritten maintains the correctness of
+ // getOldestOutstandingSeqNum(). But it doesn't matter really because
+ // everything is being done inside of cacheFlush lock.
+ Long oldseq =
+ lastSeqWritten.put(getSnapshotName(encodedRegionName), seq);
+ if (oldseq != null) {
+ LOG.error("Logic Error Snapshot seq id from earlier flush still" +
+ " present! for region " + Bytes.toString(encodedRegionName) +
+ " overwritten oldseq=" + oldseq + "with new seq=" + seq);
+ Runtime.getRuntime().halt(1);
+ }
}
return obtainSeqNum();
}
@Override
- public void completeCacheFlush(final byte [] encodedRegionName)
- {
- synchronized (oldestSeqNumsLock) {
- this.oldestFlushingSeqNums.remove(encodedRegionName);
+ public void completeCacheFlush(final byte [] encodedRegionName,
+ final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
+ throws IOException {
+ try {
+ if (this.closed) {
+ return;
+ }
+ long txid = 0;
+ synchronized (updateLock) {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ WALEdit edit = completeCacheFlushLogEdit();
+ HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
+ System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
+ logSyncerThread.append(new Entry(key, edit));
+ txid = this.unflushedEntries.incrementAndGet();
+ long took = EnvironmentEdgeManager.currentTimeMillis() - now;
+ long len = 0;
+ for (KeyValue kv : edit.getKeyValues()) {
+ len += kv.getLength();
+ }
+ this.metrics.finishAppend(took, len);
+ this.numEntries.incrementAndGet();
+ }
+ // sync txn to file system
+ this.sync(txid);
+
+ } finally {
+ // updateLock not needed for removing snapshot's entry
+ // Cleaning up of lastSeqWritten is in the finally clause because we
+ // don't want to confuse getOldestOutstandingSeqNum()
+ this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
+ this.cacheFlushLock.unlock();
}
- closeBarrier.endOp();
+ }
+
+ private WALEdit completeCacheFlushLogEdit() {
+ KeyValue kv = new KeyValue(HLog.METAROW, HLog.METAFAMILY, null,
+ System.currentTimeMillis(), HLogUtil.COMPLETE_CACHE_FLUSH);
+ WALEdit e = new WALEdit();
+ e.add(kv);
+ return e;
}
@Override
public void abortCacheFlush(byte[] encodedRegionName) {
- Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
- synchronized (oldestSeqNumsLock) {
- seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
- if (seqNumBeforeFlushStarts != null) {
- currentSeqNum =
- this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
- }
- }
- closeBarrier.endOp();
- if ((currentSeqNum != null)
- && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
- String errorStr = "Region " + Bytes.toString(encodedRegionName) +
- "acquired edits out of order current memstore seq=" + currentSeqNum
- + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
- LOG.error(errorStr);
- assert false : errorStr;
- Runtime.getRuntime().halt(1);
+ Long snapshot_seq =
+ this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
+ if (snapshot_seq != null) {
+ // updateLock not necessary because we are racing against
+ // lastSeqWritten.putIfAbsent() in append() and we will always win
+ // before releasing cacheFlushLock make sure that the region's entry in
+ // lastSeqWritten points to the earliest edit in the region
+ Long current_memstore_earliest_seq =
+ this.lastSeqWritten.put(encodedRegionName, snapshot_seq);
+ if (current_memstore_earliest_seq != null &&
+ (current_memstore_earliest_seq.longValue() <=
+ snapshot_seq.longValue())) {
+ LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) +
+ "acquired edits out of order current memstore seq=" +
+ current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
+ Runtime.getRuntime().halt(1);
+ }
}
+ this.cacheFlushLock.unlock();
}
@Override
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Jan 21 21:36:25 2013
@@ -162,14 +162,14 @@ public interface HLog {
}
}
- /**
+ /*
* registers WALActionsListener
*
* @param listener
*/
public void registerWALActionsListener(final WALActionsListener listener);
- /**
+ /*
* unregisters WALActionsListener
*
* @param listener
@@ -200,10 +200,18 @@ public interface HLog {
/**
* Roll the log writer. That is, start writing log messages to a new file.
*
+ * Because a log cannot be rolled during a cache flush, and a cache flush
+ * spans two method calls, a special lock needs to be obtained so that a cache
+ * flush cannot start when the log is being rolled and the log cannot be
+ * rolled during a cache flush.
+ *
* <p>
- * The implementation is synchronized in order to make sure there's one rollWriter
- * running at any given time.
- *
+ * Note that this method cannot be synchronized because it is possible that
+ * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
+ * start which would obtain the lock on this but block on obtaining the
+ * cacheFlushLock and then completeCacheFlush could be called which would wait
+ * for the lock on this and consequently never release the cacheFlushLock
+ *
* @return If lots of logs, flush the returned regions so next time through we
* can clean logs. Returns null if nothing to flush. Names are actual
* region names as returned by {@link HRegionInfo#getEncodedName()}
@@ -215,9 +223,17 @@ public interface HLog {
/**
* Roll the log writer. That is, start writing log messages to a new file.
*
+ * Because a log cannot be rolled during a cache flush, and a cache flush
+ * spans two method calls, a special lock needs to be obtained so that a cache
+ * flush cannot start when the log is being rolled and the log cannot be
+ * rolled during a cache flush.
+ *
* <p>
- * The implementation is synchronized in order to make sure there's one rollWriter
- * running at any given time.
+ * Note that this method cannot be synchronized because it is possible that
+ * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
+ * start which would obtain the lock on this but block on obtaining the
+ * cacheFlushLock and then completeCacheFlush could be called which would wait
+ * for the lock on this and consequently never release the cacheFlushLock
*
* @param force
* If true, force creation of a new writer even if no entries have
@@ -321,33 +337,53 @@ public interface HLog {
public long obtainSeqNum();
/**
- * WAL keeps track of the sequence numbers that were not yet flushed from memstores
- * in order to be able to do cleanup. This method tells WAL that some region is about
- * to flush memstore.
- *
- * We stash the oldest seqNum for the region, and let the the next edit inserted in this
- * region be recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)}
- * as new oldest seqnum. In case of flush being aborted, we put the stashed value back;
- * in case of flush succeeding, the seqNum of that first edit after start becomes the
- * valid oldest seqNum for this region.
- *
- * @return current seqNum, to pass on to flushers (who will put it into the metadata of
- * the resulting file as an upper-bound seqNum for that file), or NULL if flush
- * should not be started.
+ * By acquiring a log sequence ID, we can allow log messages to continue while
+ * we flush the cache.
+ *
+ * Acquire a lock so that we do not roll the log between the start and
+ * completion of a cache-flush. Otherwise the log-seq-id for the flush will
+ * not appear in the correct logfile.
+ *
+ * Ensuring that flushes and log-rolls don't happen concurrently also allows
+ * us to temporarily put a log-seq-number in lastSeqWritten against the region
+ * being flushed that might not be the earliest in-memory log-seq-number for
+ * that region. By the time the flush is completed or aborted and before the
+ * cacheFlushLock is released it is ensured that lastSeqWritten again has the
+ * oldest in-memory edit's lsn for the region that was being flushed.
+ *
+ * In this method, by removing the entry in lastSeqWritten for the region
+ * being flushed we ensure that the next edit inserted in this region will be
+ * correctly recorded in
+ * {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The
+ * lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
+ * is saved temporarily in the lastSeqWritten map while the flush is active.
+ *
+ * @return sequence ID to pass
+ * {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[],
+ * byte[], long)}
+ * @see #completeCacheFlush(byte[], byte[], long, boolean)
+ * @see #abortCacheFlush(byte[])
*/
- public Long startCacheFlush(final byte[] encodedRegionName);
+ public long startCacheFlush(final byte[] encodedRegionName);
/**
- * Complete the cache flush.
- * @param encodedRegionName Encoded region name.
+ * Complete the cache flush
+ *
+ * Protected by cacheFlushLock
+ *
+ * @param encodedRegionName
+ * @param tableName
+ * @param logSeqId
+ * @throws IOException
*/
- public void completeCacheFlush(final byte[] encodedRegionName);
+ public void completeCacheFlush(final byte[] encodedRegionName,
+ final byte[] tableName, final long logSeqId, final boolean isMetaRegion)
+ throws IOException;
/**
* Abort a cache flush. Call if the flush fails. Note that the only recovery
* for an aborted flush currently is a restart of the regionserver so the
- * snapshot content dropped by the failure gets restored to the memstore.v
- * @param encodedRegionName Encoded region name.
+ * snapshot content dropped by the failure gets restored to the memstore.
*/
public void abortCacheFlush(byte[] encodedRegionName);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java Mon Jan 21 21:36:25 2013
@@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.util.Byte
public class HLogUtil {
static final Log LOG = LogFactory.getLog(HLogUtil.class);
+ static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
+
/**
* @param family
* @return true if the column is a meta column
@@ -242,6 +244,32 @@ public class HLogUtil {
}
/**
+ * Return regions (memstores) that have edits that are equal or less than the
+ * passed <code>oldestWALseqid</code>.
+ *
+ * @param oldestWALseqid
+ * @param regionsToSeqids
+ * Encoded region names to sequence ids
+ * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
+ * necessarily in order). Null if no regions found.
+ */
+ static byte[][] findMemstoresWithEditsEqualOrOlderThan(
+ final long oldestWALseqid, final Map<byte[], Long> regionsToSeqids) {
+ // This method is static so it can be unit tested the easier.
+ List<byte[]> regions = null;
+ for (Map.Entry<byte[], Long> e : regionsToSeqids.entrySet()) {
+ if (e.getValue().longValue() <= oldestWALseqid) {
+ if (regions == null)
+ regions = new ArrayList<byte[]>();
+ // Key is encoded region name.
+ regions.add(e.getKey());
+ }
+ }
+ return regions == null ? null : regions
+ .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
+ }
+
+ /**
* Returns sorted set of edit files made by wal-log splitter, excluding files
* with '.temp' suffix.
*
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Mon Jan 21 21:36:25 2013
@@ -244,12 +244,7 @@ public class SequenceFileLogWriter imple
@Override
public void sync() throws IOException {
- try {
- this.writer.syncFs();
- } catch (NullPointerException npe) {
- // Concurrent close...
- throw new IOException(npe);
- }
+ this.writer.syncFs();
}
@Override
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Mon Jan 21 21:36:25 2013
@@ -323,11 +323,11 @@ public class TestHLog {
regionsToSeqids.put(l.toString().getBytes(), l);
}
byte [][] regions =
- FSHLog.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
+ HLogUtil.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
assertEquals(2, regions.length);
assertTrue(Bytes.equals(regions[0], "0".getBytes()) ||
Bytes.equals(regions[0], "1".getBytes()));
- regions = FSHLog.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
+ regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
int count = 4;
assertEquals(count, regions.length);
// Regions returned are not ordered.
@@ -518,8 +518,9 @@ public class TestHLog {
htd.addFamily(new HColumnDescriptor("column"));
log.append(info, tableName, cols, System.currentTimeMillis(), htd);
- log.startCacheFlush(info.getEncodedNameAsBytes());
- log.completeCacheFlush(info.getEncodedNameAsBytes());
+ long logSeqId = log.startCacheFlush(info.getEncodedNameAsBytes());
+ log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId,
+ info.isMetaRegion());
log.close();
Path filename = ((FSHLog) log).computeFilename();
log = null;
@@ -539,6 +540,20 @@ public class TestHLog {
assertEquals((byte)(i + '0'), kv.getValue()[0]);
System.out.println(key + " " + val);
}
+ HLog.Entry entry = null;
+ while ((entry = reader.next(null)) != null) {
+ HLogKey key = entry.getKey();
+ WALEdit val = entry.getEdit();
+ // Assert only one more row... the meta flushed row.
+ assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
+ assertTrue(Bytes.equals(tableName, key.getTablename()));
+ KeyValue kv = val.getKeyValues().get(0);
+ assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
+ assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
+ assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH,
+ val.getKeyValues().get(0).getValue()));
+ System.out.println(key + " " + val);
+ }
} finally {
if (log != null) {
log.closeAndDelete();
@@ -574,8 +589,8 @@ public class TestHLog {
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor("column"));
log.append(hri, tableName, cols, System.currentTimeMillis(), htd);
- log.startCacheFlush(hri.getEncodedNameAsBytes());
- log.completeCacheFlush(hri.getEncodedNameAsBytes());
+ long logSeqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
+ log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false);
log.close();
Path filename = ((FSHLog) log).computeFilename();
log = null;
@@ -593,6 +608,20 @@ public class TestHLog {
System.out.println(entry.getKey() + " " + val);
idx++;
}
+
+ // Get next row... the meta flushed row.
+ entry = reader.next();
+ assertEquals(1, entry.getEdit().size());
+ for (KeyValue val : entry.getEdit().getKeyValues()) {
+ assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
+ entry.getKey().getEncodedRegionName()));
+ assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
+ assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
+ assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
+ assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH,
+ val.getValue()));
+ System.out.println(entry.getKey() + " " + val);
+ }
} finally {
if (log != null) {
log.closeAndDelete();
@@ -676,19 +705,17 @@ public class TestHLog {
assertEquals(3, ((FSHLog) log).getNumLogFiles());
// Flush the first region, we expect to see the first two files getting
- // archived. We need to append something or writer won't be rolled.
- addEdits(log, hri2, tableName2, 1);
- log.startCacheFlush(hri.getEncodedNameAsBytes());
- log.completeCacheFlush(hri.getEncodedNameAsBytes());
+ // archived
+ long seqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
+ log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false);
log.rollWriter();
assertEquals(2, ((FSHLog) log).getNumLogFiles());
// Flush the second region, which removes all the remaining output files
// since the oldest was completely flushed and the two others only contain
// flush information
- addEdits(log, hri2, tableName2, 1);
- log.startCacheFlush(hri2.getEncodedNameAsBytes());
- log.completeCacheFlush(hri2.getEncodedNameAsBytes());
+ seqId = log.startCacheFlush(hri2.getEncodedNameAsBytes());
+ log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false);
log.rollWriter();
assertEquals(0, ((FSHLog) log).getNumLogFiles());
} finally {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java Mon Jan 21 21:36:25 2013
@@ -117,7 +117,7 @@ public class TestWALActionsListener {
assertEquals(11, observer.postLogRollCounter);
assertEquals(5, laterobserver.preLogRollCounter);
assertEquals(5, laterobserver.postLogRollCounter);
- assertEquals(1, observer.closedCount);
+ assertEquals(2, observer.closedCount);
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Mon Jan 21 21:36:25 2013
@@ -557,8 +557,8 @@ public class TestWALReplay {
}
// Add a cache flush, shouldn't have any effect
- wal.startCacheFlush(regionName);
- wal.completeCacheFlush(regionName);
+ long logSeqId = wal.startCacheFlush(regionName);
+ wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion());
// Add an edit to another family, should be skipped.
WALEdit edit = new WALEdit();
@@ -661,7 +661,7 @@ public class TestWALReplay {
wal.doCompleteCacheFlush = true;
// allow complete cache flush with the previous seq number got after first
// set of edits.
- wal.completeCacheFlush(hri.getEncodedNameAsBytes());
+ wal.completeCacheFlush(hri.getEncodedNameAsBytes(), hri.getTableName(), sequenceNumber, false);
wal.close();
FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf,
@@ -686,11 +686,12 @@ public class TestWALReplay {
}
@Override
- public void completeCacheFlush(byte[] encodedRegionName) {
+ public void completeCacheFlush(byte[] encodedRegionName, byte[] tableName, long logSeqId,
+ boolean isMetaRegion) throws IOException {
if (!doCompleteCacheFlush) {
return;
}
- super.completeCacheFlush(encodedRegionName);
+ super.completeCacheFlush(encodedRegionName, tableName, logSeqId, isMetaRegion);
}
}