You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/12/16 20:50:56 UTC
[1/2] hbase git commit: HBASE-10201 Port 'Make flush decisions per
column family' to trunk
Repository: hbase
Updated Branches:
refs/heads/master a411227b0 -> c7fad665f
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index ced3383..d2ba69d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -31,10 +33,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -50,7 +54,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -64,15 +67,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
-import org.apache.hadoop.hbase.wal.WALProvider.Writer;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -80,6 +75,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.util.StringUtils;
import org.htrace.NullScope;
@@ -88,6 +90,7 @@ import org.htrace.Trace;
import org.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
@@ -333,33 +336,35 @@ public class FSHLog implements WAL {
// sequence id numbers are by region and unrelated to the ring buffer sequence number accounting
// done above in failedSequence, highest sequence, etc.
/**
- * This lock ties all operations on oldestFlushingRegionSequenceIds and
- * oldestFlushedRegionSequenceIds Maps with the exception of append's putIfAbsent call into
- * oldestUnflushedSeqNums. We use these Maps to find out the low bound regions sequence id, or
- * to find regions with old sequence ids to force flush; we are interested in old stuff not the
- * new additions (TODO: IS THIS SAFE? CHECK!).
+ * This lock ties all operations on lowestFlushingStoreSequenceIds and
+ * oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into
+ * oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions
+ * sequence id, or to find regions with old sequence ids to force flush; we are interested in
+ * old stuff not the new additions (TODO: IS THIS SAFE? CHECK!).
*/
private final Object regionSequenceIdLock = new Object();
/**
- * Map of encoded region names to their OLDEST -- i.e. their first, the longest-lived --
- * sequence id in memstore. Note that this sequence id is the region sequence id. This is not
- * related to the id we use above for {@link #highestSyncedSequence} and
- * {@link #highestUnsyncedSequence} which is the sequence from the disruptor ring buffer.
+ * Map of encoded region names and family names to their OLDEST -- i.e. their first,
+ * the longest-lived -- sequence id in memstore. Note that this sequence id is the region
+ * sequence id. This is not related to the id we use above for {@link #highestSyncedSequence}
+ * and {@link #highestUnsyncedSequence} which is the sequence from the disruptor
+ * ring buffer.
*/
- private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedRegionSequenceIds =
- new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+ private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> oldestUnflushedStoreSequenceIds
+ = new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
+ Bytes.BYTES_COMPARATOR);
/**
- * Map of encoded region names to their lowest or OLDEST sequence/edit id in memstore currently
- * being flushed out to hfiles. Entries are moved here from
- * {@link #oldestUnflushedRegionSequenceIds} while the lock {@link #regionSequenceIdLock} is held
+ * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in
+ * memstore currently being flushed out to hfiles. Entries are moved here from
+ * {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held
* (so movement between the Maps is atomic). This is not related to the id we use above for
* {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from
* the disruptor ring buffer, an internal detail.
*/
- private final Map<byte[], Long> lowestFlushingRegionSequenceIds =
- new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ private final Map<byte[], Map<byte[], Long>> lowestFlushingStoreSequenceIds =
+ new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
/**
* Map of region encoded names to the latest region sequence id. Updated on each append of
@@ -734,6 +739,28 @@ public class FSHLog implements WAL {
return DefaultWALProvider.createWriter(conf, fs, path, false);
}
+ private long getLowestSeqId(Map<byte[], Long> seqIdMap) {
+ long result = HConstants.NO_SEQNUM;
+ for (Long seqNum: seqIdMap.values()) {
+ if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) {
+ result = seqNum.longValue();
+ }
+ }
+ return result;
+ }
+
+ private <T extends Map<byte[], Long>> Map<byte[], Long> copyMapWithLowestSeqId(
+ Map<byte[], T> mapToCopy) {
+ Map<byte[], Long> copied = Maps.newHashMap();
+ for (Map.Entry<byte[], T> entry: mapToCopy.entrySet()) {
+ long lowestSeqId = getLowestSeqId(entry.getValue());
+ if (lowestSeqId != HConstants.NO_SEQNUM) {
+ copied.put(entry.getKey(), lowestSeqId);
+ }
+ }
+ return copied;
+ }
+
/**
* Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits
* have been flushed to hfiles.
@@ -746,22 +773,23 @@ public class FSHLog implements WAL {
* @throws IOException
*/
private void cleanOldLogs() throws IOException {
- Map<byte[], Long> oldestFlushingSeqNumsLocal = null;
- Map<byte[], Long> oldestUnflushedSeqNumsLocal = null;
+ Map<byte[], Long> lowestFlushingRegionSequenceIdsLocal = null;
+ Map<byte[], Long> oldestUnflushedRegionSequenceIdsLocal = null;
List<Path> logsToArchive = new ArrayList<Path>();
// make a local copy so as to avoid locking when we iterate over these maps.
synchronized (regionSequenceIdLock) {
- oldestFlushingSeqNumsLocal = new HashMap<byte[], Long>(this.lowestFlushingRegionSequenceIds);
- oldestUnflushedSeqNumsLocal =
- new HashMap<byte[], Long>(this.oldestUnflushedRegionSequenceIds);
+ lowestFlushingRegionSequenceIdsLocal =
+ copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds);
+ oldestUnflushedRegionSequenceIdsLocal =
+ copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds);
}
for (Map.Entry<Path, Map<byte[], Long>> e : byWalRegionSequenceIds.entrySet()) {
// iterate over the log file.
Path log = e.getKey();
Map<byte[], Long> sequenceNums = e.getValue();
// iterate over the map for this log file, and tell whether it should be archive or not.
- if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal,
- oldestUnflushedSeqNumsLocal)) {
+ if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal,
+ oldestUnflushedRegionSequenceIdsLocal)) {
logsToArchive.add(log);
LOG.debug("WAL file ready for archiving " + log);
}
@@ -815,10 +843,11 @@ public class FSHLog implements WAL {
List<byte[]> regionsToFlush = null;
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
synchronized (regionSequenceIdLock) {
- for (Map.Entry<byte[], Long> e : regionsSequenceNums.entrySet()) {
- Long unFlushedVal = this.oldestUnflushedRegionSequenceIds.get(e.getKey());
- if (unFlushedVal != null && unFlushedVal <= e.getValue()) {
- if (regionsToFlush == null) regionsToFlush = new ArrayList<byte[]>();
+ for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
+ long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey());
+ if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
+ if (regionsToFlush == null)
+ regionsToFlush = new ArrayList<byte[]>();
regionsToFlush.add(e.getKey());
}
}
@@ -1584,36 +1613,53 @@ public class FSHLog implements WAL {
// +1 for current use log
return getNumRolledLogFiles() + 1;
}
-
+
// public only until class moves to o.a.h.h.wal
/** @return the size of log files in use */
public long getLogFileSize() {
return this.totalLogSize.get();
}
-
+
@Override
- public boolean startCacheFlush(final byte[] encodedRegionName) {
- Long oldRegionSeqNum = null;
+ public boolean startCacheFlush(final byte[] encodedRegionName,
+ Set<byte[]> flushedFamilyNames) {
+ Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
if (!closeBarrier.beginOp()) {
LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
" - because the server is closing.");
return false;
}
synchronized (regionSequenceIdLock) {
- oldRegionSeqNum = this.oldestUnflushedRegionSequenceIds.remove(encodedRegionName);
- if (oldRegionSeqNum != null) {
- Long oldValue =
- this.lowestFlushingRegionSequenceIds.put(encodedRegionName, oldRegionSeqNum);
- assert oldValue ==
- null : "Flushing map not cleaned up for " + Bytes.toString(encodedRegionName);
+ ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+ oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+ if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
+ for (byte[] familyName: flushedFamilyNames) {
+ Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName);
+ if (seqId != null) {
+ oldStoreSeqNum.put(familyName, seqId);
+ }
+ }
+ if (!oldStoreSeqNum.isEmpty()) {
+ Map<byte[], Long> oldValue = this.lowestFlushingStoreSequenceIds.put(
+ encodedRegionName, oldStoreSeqNum);
+ assert oldValue == null: "Flushing map not cleaned up for "
+ + Bytes.toString(encodedRegionName);
+ }
+ if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) {
+ // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
+ // even if the region is already moved to other server.
+ // Do not worry about data racing, we held write lock of region when calling
+ // startCacheFlush, so no one can add value to the map we removed.
+ oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
+ }
}
}
- if (oldRegionSeqNum == null) {
- // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
- // the region is already flushing (which would make this call invalid), or there
- // were no appends after last flush, so why are we starting flush? Maybe we should
- // assert not null, and switch to "long" everywhere. Less rigorous, but safer,
- // alternative is telling the caller to stop. For now preserve old logic.
+ if (oldStoreSeqNum.isEmpty()) {
+ // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
+ // the region is already flushing (which would make this call invalid), or there
+ // were no appends after last flush, so why are we starting flush? Maybe we should
+ // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
+ // For now preserve old logic.
LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
+ Bytes.toString(encodedRegionName) + "]");
}
@@ -1623,30 +1669,59 @@ public class FSHLog implements WAL {
@Override
public void completeCacheFlush(final byte [] encodedRegionName) {
synchronized (regionSequenceIdLock) {
- this.lowestFlushingRegionSequenceIds.remove(encodedRegionName);
+ this.lowestFlushingStoreSequenceIds.remove(encodedRegionName);
}
closeBarrier.endOp();
}
+ private ConcurrentMap<byte[], Long> getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(
+ byte[] encodedRegionName) {
+ ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+ oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+ if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
+ return oldestUnflushedStoreSequenceIdsOfRegion;
+ }
+ oldestUnflushedStoreSequenceIdsOfRegion =
+ new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ ConcurrentMap<byte[], Long> alreadyPut =
+ oldestUnflushedStoreSequenceIds.put(encodedRegionName,
+ oldestUnflushedStoreSequenceIdsOfRegion);
+ return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut;
+ }
+
@Override
public void abortCacheFlush(byte[] encodedRegionName) {
- Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
+ Map<byte[], Long> storeSeqNumsBeforeFlushStarts;
+ Map<byte[], Long> currentStoreSeqNums = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
synchronized (regionSequenceIdLock) {
- seqNumBeforeFlushStarts = this.lowestFlushingRegionSequenceIds.remove(encodedRegionName);
- if (seqNumBeforeFlushStarts != null) {
- currentSeqNum =
- this.oldestUnflushedRegionSequenceIds.put(encodedRegionName, seqNumBeforeFlushStarts);
+ storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove(
+ encodedRegionName);
+ if (storeSeqNumsBeforeFlushStarts != null) {
+ ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+ getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
+ for (Map.Entry<byte[], Long> familyNameAndSeqId: storeSeqNumsBeforeFlushStarts
+ .entrySet()) {
+ currentStoreSeqNums.put(familyNameAndSeqId.getKey(),
+ oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(),
+ familyNameAndSeqId.getValue()));
+ }
}
}
closeBarrier.endOp();
- if ((currentSeqNum != null)
- && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
- String errorStr = "Region " + Bytes.toString(encodedRegionName) +
- "acquired edits out of order current memstore seq=" + currentSeqNum
- + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
- LOG.error(errorStr);
- assert false : errorStr;
- Runtime.getRuntime().halt(1);
+ if (storeSeqNumsBeforeFlushStarts != null) {
+ for (Map.Entry<byte[], Long> familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) {
+ Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey());
+ if (currentSeqNum != null
+ && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) {
+ String errorStr =
+ "Region " + Bytes.toString(encodedRegionName) + " family "
+ + Bytes.toString(familyNameAndSeqId.getKey())
+ + " acquired edits out of order current memstore seq=" + currentSeqNum
+ + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue();
+ LOG.error(errorStr);
+ Runtime.getRuntime().halt(1);
+ }
+ }
}
}
@@ -1677,8 +1752,23 @@ public class FSHLog implements WAL {
@Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
- Long result = oldestUnflushedRegionSequenceIds.get(encodedRegionName);
- return result == null ? HConstants.NO_SEQNUM : result.longValue();
+ ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+ this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+ return oldestUnflushedStoreSequenceIdsOfRegion != null ?
+ getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM;
+ }
+
+ @Override
+ public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
+ byte[] familyName) {
+ ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+ this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+ if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
+ Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName);
+ return result != null ? result.longValue() : HConstants.NO_SEQNUM;
+ } else {
+ return HConstants.NO_SEQNUM;
+ }
}
/**
@@ -1914,6 +2004,15 @@ public class FSHLog implements WAL {
}
}
+ private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName,
+ Set<byte[]> familyNameSet, Long lRegionSequenceId) {
+ ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+ getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
+ for (byte[] familyName : familyNameSet) {
+ oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId);
+ }
+ }
+
/**
* Append to the WAL. Does all CP and WAL listener calls.
* @param entry
@@ -1961,9 +2060,10 @@ public class FSHLog implements WAL {
Long lRegionSequenceId = Long.valueOf(regionSequenceId);
highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
if (entry.isInMemstore()) {
- oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId);
+ updateOldestUnflushedSequenceIds(encodedRegionName,
+ entry.getFamilyNames(), lRegionSequenceId);
}
-
+
coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
// Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index d9942b3..147a13d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -19,13 +19,21 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CollectionUtils;
+
+import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -96,7 +104,7 @@ class FSWALEntry extends Entry {
*/
long stampRegionSequenceId() throws IOException {
long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
- if (!this.getEdit().isReplay() && memstoreCells != null && !memstoreCells.isEmpty()) {
+ if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) {
for (Cell cell : this.memstoreCells) {
CellUtil.setSequenceId(cell, regionSequenceId);
}
@@ -105,4 +113,21 @@ class FSWALEntry extends Entry {
key.setLogSeqNum(regionSequenceId);
return regionSequenceId;
}
+
+ /**
+ * @return the family names which are effected by this edit.
+ */
+ Set<byte[]> getFamilyNames() {
+ ArrayList<Cell> cells = this.getEdit().getCells();
+ if (CollectionUtils.isEmpty(cells)) {
+ return Collections.<byte[]>emptySet();
+ }
+ Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
+ for (Cell cell : cells) {
+ if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+ familySet.add(CellUtil.cloneFamily(cell));
+ }
+ }
+ return familySet;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index e0fc35c..f571166 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -182,7 +183,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public boolean startCacheFlush(final byte[] encodedRegionName) {
+ public boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
return !(closed.get());
}
@@ -205,6 +206,11 @@ class DisabledWALProvider implements WALProvider {
}
@Override
+ public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
+ return HConstants.NO_SEQNUM;
+ }
+
+ @Override
public String toString() {
return "WAL disabled.";
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 23f8c9f..5a2b08d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -152,7 +153,7 @@ public interface WAL {
* @return true if the flush can proceed, false in case wal is closing (ususally, when server is
* closing) and flush couldn't be started.
*/
- boolean startCacheFlush(final byte[] encodedRegionName);
+ boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames);
/**
* Complete the cache flush.
@@ -182,6 +183,14 @@ public interface WAL {
long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
/**
+ * Gets the earliest sequence number in the memstore for this particular region and store.
+ * @param encodedRegionName The region to get the number for.
+ * @param familyName The family to get the number for.
+ * @return The number if present, HConstants.NO_SEQNUM if absent.
+ */
+ long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName);
+
+ /**
* Human readable identifying information about the state of this WAL.
* Implementors are encouraged to include information appropriate for debugging.
* Consumers are advised not to rely on the details of the returned String; it does
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index a9493c7..777ecea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -268,7 +268,7 @@ public class TestIOFencing {
compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
LOG.info("Blocking compactions");
compactingRegion.stopCompactions();
- long lastFlushTime = compactingRegion.getLastFlushTime();
+ long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores();
// Load some rows
TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
@@ -284,7 +284,7 @@ public class TestIOFencing {
// Wait till flush has happened, otherwise there won't be multiple store files
long startWaitTime = System.currentTimeMillis();
- while (compactingRegion.getLastFlushTime() <= lastFlushTime ||
+ while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime ||
compactingRegion.countStoreFiles() <= 1) {
LOG.info("Waiting for the region to flush " + compactingRegion.getRegionNameAsString());
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
index ace24b1..676885b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
@@ -34,8 +34,8 @@ public class TestFlushRegionEntry {
@Test
public void test() {
- FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class));
- FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class));
+ FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class), true);
+ FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class), true);
assertEquals(entry.hashCode(), other.hashCode());
assertEquals(entry, other);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
index 91de97c..c1eeea0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
@@ -113,11 +113,11 @@ public class TestHeapMemoryManager {
long oldBlockCacheSize = blockCache.maxSize;
heapMemoryManager.start();
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
- memStoreFlusher.requestFlush(null);
- memStoreFlusher.requestFlush(null);
- memStoreFlusher.requestFlush(null);
+ memStoreFlusher.requestFlush(null, false);
+ memStoreFlusher.requestFlush(null, false);
+ memStoreFlusher.requestFlush(null, false);
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
- memStoreFlusher.requestFlush(null);
+ memStoreFlusher.requestFlush(null, false);
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
@@ -127,8 +127,8 @@ public class TestHeapMemoryManager {
oldBlockCacheSize = blockCache.maxSize;
// Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
- memStoreFlusher.requestFlush(null);
- memStoreFlusher.requestFlush(null);
+ memStoreFlusher.requestFlush(null, false);
+ memStoreFlusher.requestFlush(null, false);
Thread.sleep(1500);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
@@ -408,12 +408,12 @@ public class TestHeapMemoryManager {
}
@Override
- public void requestFlush(HRegion region) {
+ public void requestFlush(HRegion region, boolean forceFlushAllStores) {
this.listener.flushRequested(flushType, region);
}
@Override
- public void requestDelayedFlush(HRegion region, long delay) {
+ public void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) {
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
new file mode 100644
index 0000000..43a9575
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
+import org.apache.hadoop.hbase.regionserver.FlushAllStoresPolicy;
+import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
+import org.apache.hadoop.hbase.regionserver.FlushPolicy;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.hash.Hashing;
+
+/**
+ * This test verifies the correctness of the Per Column Family flushing strategy
+ */
+@Category(MediumTests.class)
+public class TestPerColumnFamilyFlush {
+ private static final Log LOG = LogFactory.getLog(TestPerColumnFamilyFlush.class);
+
+ HRegion region = null;
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
+
+ public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
+
+ public static final byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
+ Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
+
+ public static final byte[] FAMILY1 = families[0];
+
+ public static final byte[] FAMILY2 = families[1];
+
+ public static final byte[] FAMILY3 = families[2];
+
+ private void initHRegion(String callingMethod, Configuration conf) throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(TABLENAME);
+ for (byte[] family : families) {
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+ HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
+ Path path = new Path(DIR, callingMethod);
+ region = HRegion.createHRegion(info, path, conf, htd);
+ }
+
+ // A helper function to create puts.
+ private Put createPut(int familyNum, int putNum) {
+ byte[] qf = Bytes.toBytes("q" + familyNum);
+ byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
+ byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
+ Put p = new Put(row);
+ p.add(families[familyNum - 1], qf, val);
+ return p;
+ }
+
+ // A helper function to create puts.
+ private Get createGet(int familyNum, int putNum) {
+ byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
+ return new Get(row);
+ }
+
+ // A helper function to verify edits.
+ void verifyEdit(int familyNum, int putNum, HTable table) throws IOException {
+ Result r = table.get(createGet(familyNum, putNum));
+ byte[] family = families[familyNum - 1];
+ byte[] qf = Bytes.toBytes("q" + familyNum);
+ byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
+ assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
+ assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
+ r.getFamilyMap(family).get(qf));
+ assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
+ Arrays.equals(r.getFamilyMap(family).get(qf), val));
+ }
+
+ @Test
+ public void testSelectiveFlushWhenEnabled() throws IOException {
+ // Set up the configuration
+ Configuration conf = HBaseConfiguration.create();
+ conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
+ conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+ FlushLargeStoresPolicy.class.getName());
+ conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+ 100 * 1024);
+ // Intialize the HRegion
+ initHRegion("testSelectiveFlushWhenEnabled", conf);
+ // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+ for (int i = 1; i <= 1200; i++) {
+ region.put(createPut(1, i));
+
+ if (i <= 100) {
+ region.put(createPut(2, i));
+ if (i <= 50) {
+ region.put(createPut(3, i));
+ }
+ }
+ }
+
+ long totalMemstoreSize = region.getMemstoreSize().get();
+
+ // Find the smallest LSNs for edits wrt to each CF.
+ long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
+ long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
+ long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
+
+ // Find the sizes of the memstores of each CF.
+ long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+ long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+ long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+
+ // Get the overall smallest LSN in the region's memstores.
+ long smallestSeqInRegionCurrentMemstore =
+ region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+ // The overall smallest LSN in the region's memstores should be the same as
+ // the LSN of the smallest edit in CF1
+ assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
+
+ // Some other sanity checks.
+ assertTrue(smallestSeqCF1 < smallestSeqCF2);
+ assertTrue(smallestSeqCF2 < smallestSeqCF3);
+ assertTrue(cf1MemstoreSize > 0);
+ assertTrue(cf2MemstoreSize > 0);
+ assertTrue(cf3MemstoreSize > 0);
+
+ // The total memstore size should be the same as the sum of the sizes of
+ // memstores of CF1, CF2 and CF3.
+ assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
+ + cf2MemstoreSize + cf3MemstoreSize);
+
+ // Flush!
+ region.flushcache(false);
+
+ // Will use these to check if anything changed.
+ long oldCF2MemstoreSize = cf2MemstoreSize;
+ long oldCF3MemstoreSize = cf3MemstoreSize;
+
+ // Recalculate everything
+ cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+ cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+ cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+ totalMemstoreSize = region.getMemstoreSize().get();
+ smallestSeqInRegionCurrentMemstore =
+ region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+ // We should have cleared out only CF1, since we chose the flush thresholds
+ // and number of puts accordingly.
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+ // Nothing should have happened to CF2, ...
+ assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
+ // ... or CF3
+ assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
+ // Now the smallest LSN in the region should be the same as the smallest
+ // LSN in the memstore of CF2.
+ assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
+ // Of course, this should hold too.
+ assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
+ + cf3MemstoreSize);
+
+ // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
+ for (int i = 1200; i < 2400; i++) {
+ region.put(createPut(2, i));
+
+ // Add only 100 puts for CF3
+ if (i - 1200 < 100) {
+ region.put(createPut(3, i));
+ }
+ }
+
+ // How much does the CF3 memstore occupy? Will be used later.
+ oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+
+ // Flush again
+ region.flushcache(false);
+
+ // Recalculate everything
+ cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+ cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+ cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+ totalMemstoreSize = region.getMemstoreSize().get();
+ smallestSeqInRegionCurrentMemstore =
+ region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+ // CF1 and CF2, both should be absent.
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
+ // CF3 shouldn't have been touched.
+ assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
+ assertEquals(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
+ assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);
+
+ // What happens when we hit the memstore limit, but we are not able to find
+ // any Column Family above the threshold?
+ // In that case, we should flush all the CFs.
+
+ // Clearing the existing memstores.
+ region.flushcache(true);
+
+ // The memstore limit is 200*1024 and the column family flush threshold is
+ // around 50*1024. We try to just hit the memstore limit with each CF's
+ // memstore being below the CF flush threshold.
+ for (int i = 1; i <= 300; i++) {
+ region.put(createPut(1, i));
+ region.put(createPut(2, i));
+ region.put(createPut(3, i));
+ region.put(createPut(4, i));
+ region.put(createPut(5, i));
+ }
+
+ region.flushcache(false);
+ // Since we won't find any CF above the threshold, and hence no specific
+ // store to flush, we should flush all the memstores.
+ assertEquals(0, region.getMemstoreSize().get());
+ }
+
+ @Test
+ public void testSelectiveFlushWhenNotEnabled() throws IOException {
+ // Set up the configuration
+ Configuration conf = HBaseConfiguration.create();
+ conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
+ conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
+
+ // Intialize the HRegion
+ initHRegion("testSelectiveFlushWhenNotEnabled", conf);
+ // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+ for (int i = 1; i <= 1200; i++) {
+ region.put(createPut(1, i));
+ if (i <= 100) {
+ region.put(createPut(2, i));
+ if (i <= 50) {
+ region.put(createPut(3, i));
+ }
+ }
+ }
+
+ long totalMemstoreSize = region.getMemstoreSize().get();
+
+ // Find the sizes of the memstores of each CF.
+ long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+ long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+ long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+
+ // Some other sanity checks.
+ assertTrue(cf1MemstoreSize > 0);
+ assertTrue(cf2MemstoreSize > 0);
+ assertTrue(cf3MemstoreSize > 0);
+
+ // The total memstore size should be the same as the sum of the sizes of
+ // memstores of CF1, CF2 and CF3.
+ assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
+ + cf2MemstoreSize + cf3MemstoreSize);
+
+ // Flush!
+ region.flushcache(false);
+
+ cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+ cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+ cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+ totalMemstoreSize = region.getMemstoreSize().get();
+ long smallestSeqInRegionCurrentMemstore =
+ region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+ // Everything should have been cleared
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
+ assertEquals(0, totalMemstoreSize);
+ assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
+ }
+
+ // Find the (first) region which has the specified name.
+ private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) {
+ MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+ List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
+ for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
+ HRegionServer hrs = rsts.get(i).getRegionServer();
+ for (HRegion region : hrs.getOnlineRegions(tableName)) {
+ return Pair.newPair(region, hrs);
+ }
+ }
+ return null;
+ }
+
+ @Test
+ public void testLogReplay() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
+ // Carefully chosen limits so that the memstore just flushes when we're done
+ conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+ FlushLargeStoresPolicy.class.getName());
+ conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 10000);
+ final int numRegionServers = 4;
+ TEST_UTIL.startMiniCluster(numRegionServers);
+ TEST_UTIL.getHBaseAdmin().createNamespace(
+ NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+ HTable table = TEST_UTIL.createTable(TABLENAME, families);
+ HTableDescriptor htd = table.getTableDescriptor();
+
+ for (byte[] family : families) {
+ if (!htd.hasFamily(family)) {
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+ }
+
+ // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
+ // These will all be interleaved in the log.
+ for (int i = 1; i <= 80; i++) {
+ table.put(createPut(1, i));
+ if (i <= 10) {
+ table.put(createPut(2, i));
+ table.put(createPut(3, i));
+ }
+ }
+ table.flushCommits();
+ Thread.sleep(1000);
+
+ Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
+ HRegion desiredRegion = desiredRegionAndServer.getFirst();
+ assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
+
+ // Flush the region selectively.
+ desiredRegion.flushcache(false);
+
+ long totalMemstoreSize;
+ long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
+ totalMemstoreSize = desiredRegion.getMemstoreSize().get();
+
+ // Find the sizes of the memstores of each CF.
+ cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize();
+ cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize();
+ cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
+
+ // CF1 Should have been flushed
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+ // CF2 and CF3 shouldn't have been flushed.
+ assertTrue(cf2MemstoreSize > 0);
+ assertTrue(cf3MemstoreSize > 0);
+ assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
+ + cf3MemstoreSize);
+
+ // Wait for the RS report to go across to the master, so that the master
+ // is aware of which sequence ids have been flushed, before we kill the RS.
+ // If in production, the RS dies before the report goes across, we will
+ // safely replay all the edits.
+ Thread.sleep(2000);
+
+ // Abort the region server where we have the region hosted.
+ HRegionServer rs = desiredRegionAndServer.getSecond();
+ rs.abort("testing");
+
+ // The aborted region server's regions will be eventually assigned to some
+ // other region server, and the get RPC call (inside verifyEdit()) will
+ // retry for some time till the regions come back up.
+
+ // Verify that all the edits are safe.
+ for (int i = 1; i <= 80; i++) {
+ verifyEdit(1, i, table);
+ if (i <= 10) {
+ verifyEdit(2, i, table);
+ verifyEdit(3, i, table);
+ }
+ }
+
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ // Test Log Replay with Distributed Replay on.
+ // In distributed log replay, the log splitters ask the master for the
+ // last flushed sequence id for a region. This test would ensure that we
+ // are doing the book-keeping correctly.
+ @Test
+ public void testLogReplayWithDistributedReplay() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+ testLogReplay();
+ }
+
+ /**
+ * When a log roll is about to happen, we do a flush of the regions who will be affected by the
+ * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
+ * test ensures that we do a full-flush in that scenario.
+ * @throws IOException
+ */
+ @Test
+ public void testFlushingWhenLogRolling() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000);
+ conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+ FlushLargeStoresPolicy.class.getName());
+ conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100000);
+
+ // Also, let us try real hard to get a log roll to happen.
+ // Keeping the log roll period to 2s.
+ conf.setLong("hbase.regionserver.logroll.period", 2000);
+ // Keep the block size small so that we fill up the log files very fast.
+ conf.setLong("hbase.regionserver.hlog.blocksize", 6144);
+ int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
+
+ final int numRegionServers = 4;
+ TEST_UTIL.startMiniCluster(numRegionServers);
+ TEST_UTIL.getHBaseAdmin().createNamespace(
+ NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+ HTable table = TEST_UTIL.createTable(TABLENAME, families);
+ HTableDescriptor htd = table.getTableDescriptor();
+
+ for (byte[] family : families) {
+ if (!htd.hasFamily(family)) {
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+ }
+
+ HRegion desiredRegion = getRegionWithName(TABLENAME).getFirst();
+ assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
+
+ // Add some edits. Most will be for CF1, some for CF2 and CF3.
+ for (int i = 1; i <= 10000; i++) {
+ table.put(createPut(1, i));
+ if (i <= 200) {
+ table.put(createPut(2, i));
+ table.put(createPut(3, i));
+ }
+ table.flushCommits();
+ // Keep adding until we exceed the number of log files, so that we are
+ // able to trigger the cleaning of old log files.
+ int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles();
+ if (currentNumLogFiles > maxLogs) {
+ LOG.info("The number of log files is now: " + currentNumLogFiles
+ + ". Expect a log roll and memstore flush.");
+ break;
+ }
+ }
+ table.close();
+ // Wait for some time till the flush caused by log rolling happens.
+ Thread.sleep(4000);
+
+ // We have artificially created the conditions for a log roll. When a
+ // log roll happens, we should flush all the column families. Testing that
+ // case here.
+
+ // Individual families should have been flushed.
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY1).getMemStoreSize());
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY2).getMemStoreSize());
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize());
+
+ // And of course, the total memstore should also be clean.
+ assertEquals(0, desiredRegion.getMemstoreSize().get());
+
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void doPut(HTableInterface table) throws IOException {
+ // cf1 4B per row, cf2 40B per row and cf3 400B per row
+ byte[] qf = Bytes.toBytes("qf");
+ Random rand = new Random();
+ byte[] value1 = new byte[100];
+ byte[] value2 = new byte[200];
+ byte[] value3 = new byte[400];
+ for (int i = 0; i < 10000; i++) {
+ Put put = new Put(Bytes.toBytes("row-" + i));
+ rand.setSeed(i);
+ rand.nextBytes(value1);
+ rand.nextBytes(value2);
+ rand.nextBytes(value3);
+ put.add(FAMILY1, qf, value1);
+ put.add(FAMILY2, qf, value2);
+ put.add(FAMILY3, qf, value3);
+ table.put(put);
+ }
+ }
+
+ // Under the same write load, small stores should have less store files when
+ // percolumnfamilyflush enabled.
+ @Test
+ public void testCompareStoreFileCount() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
+ conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
+ conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+ 400 * 1024);
+ conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
+ conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
+ ConstantSizeRegionSplitPolicy.class.getName());
+
+ HTableDescriptor htd = new HTableDescriptor(TABLENAME);
+ htd.setCompactionEnabled(false);
+ htd.addFamily(new HColumnDescriptor(FAMILY1));
+ htd.addFamily(new HColumnDescriptor(FAMILY2));
+ htd.addFamily(new HColumnDescriptor(FAMILY3));
+
+ LOG.info("==============Test with selective flush disabled===============");
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.getHBaseAdmin().createNamespace(
+ NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+ TEST_UTIL.getHBaseAdmin().createTable(htd);
+ getRegionWithName(TABLENAME).getFirst();
+ HConnection conn = HConnectionManager.createConnection(conf);
+ HTableInterface table = conn.getTable(TABLENAME);
+ doPut(table);
+ table.close();
+ conn.close();
+
+ HRegion region = getRegionWithName(TABLENAME).getFirst();
+ int cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
+ int cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
+ int cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
+ TEST_UTIL.shutdownMiniCluster();
+
+ LOG.info("==============Test with selective flush enabled===============");
+ conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+ FlushLargeStoresPolicy.class.getName());
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.getHBaseAdmin().createNamespace(
+ NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+ TEST_UTIL.getHBaseAdmin().createTable(htd);
+ conn = HConnectionManager.createConnection(conf);
+ table = conn.getTable(TABLENAME);
+ doPut(table);
+ table.close();
+ conn.close();
+
+ region = getRegionWithName(TABLENAME).getFirst();
+ int cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
+ int cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
+ int cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
+ TEST_UTIL.shutdownMiniCluster();
+
+ LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount
+ + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", "
+ + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount);
+ LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1
+ + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", "
+ + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1);
+ // small CF will have less store files.
+ assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
+ assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
+ }
+
+ public static void main(String[] args) throws Exception {
+ int numRegions = Integer.parseInt(args[0]);
+ long numRows = Long.parseLong(args[1]);
+
+ HTableDescriptor htd = new HTableDescriptor(TABLENAME);
+ htd.setMaxFileSize(10L * 1024 * 1024 * 1024);
+ htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
+ htd.addFamily(new HColumnDescriptor(FAMILY1));
+ htd.addFamily(new HColumnDescriptor(FAMILY2));
+ htd.addFamily(new HColumnDescriptor(FAMILY3));
+
+ Configuration conf = HBaseConfiguration.create();
+ HConnection conn = HConnectionManager.createConnection(conf);
+ HBaseAdmin admin = new HBaseAdmin(conn);
+ if (admin.tableExists(TABLENAME)) {
+ admin.disableTable(TABLENAME);
+ admin.deleteTable(TABLENAME);
+ }
+ if (numRegions >= 3) {
+ byte[] startKey = new byte[16];
+ byte[] endKey = new byte[16];
+ Arrays.fill(endKey, (byte) 0xFF);
+ admin.createTable(htd, startKey, endKey, numRegions);
+ } else {
+ admin.createTable(htd);
+ }
+ admin.close();
+
+ HTableInterface table = conn.getTable(TABLENAME);
+ byte[] qf = Bytes.toBytes("qf");
+ Random rand = new Random();
+ byte[] value1 = new byte[16];
+ byte[] value2 = new byte[256];
+ byte[] value3 = new byte[4096];
+ for (long i = 0; i < numRows; i++) {
+ Put put = new Put(Hashing.md5().hashLong(i).asBytes());
+ rand.setSeed(i);
+ rand.nextBytes(value1);
+ rand.nextBytes(value2);
+ rand.nextBytes(value3);
+ put.add(FAMILY1, qf, value1);
+ put.add(FAMILY2, qf, value2);
+ put.add(FAMILY3, qf, value3);
+ table.put(put);
+ if (i % 10000 == 0) {
+ LOG.info(i + " rows put");
+ }
+ }
+ table.close();
+ conn.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 6182cca..970b0f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@@ -43,8 +44,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -158,18 +159,15 @@ public class TestFSHLog {
}
}
- protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
- int times, AtomicLong sequenceId) throws IOException {
- HTableDescriptor htd = new HTableDescriptor();
- htd.addFamily(new HColumnDescriptor("row"));
-
- final byte [] row = Bytes.toBytes("row");
+ protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times,
+ AtomicLong sequenceId) throws IOException {
+ final byte[] row = Bytes.toBytes("row");
for (int i = 0; i < times; i++) {
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row));
- log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
- sequenceId, true, null);
+ log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
+ cols, sequenceId, true, null);
}
log.sync();
}
@@ -179,8 +177,8 @@ public class TestFSHLog {
* @param wal
* @param regionEncodedName
*/
- protected void flushRegion(WAL wal, byte[] regionEncodedName) {
- wal.startCacheFlush(regionEncodedName);
+ protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
+ wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
wal.completeCacheFlush(regionEncodedName);
}
@@ -254,10 +252,14 @@ public class TestFSHLog {
conf1.setInt("hbase.regionserver.maxlogs", 1);
FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
- TableName t1 = TableName.valueOf("t1");
- TableName t2 = TableName.valueOf("t2");
- HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
- HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ HTableDescriptor t1 =
+ new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
+ HTableDescriptor t2 =
+ new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
+ HRegionInfo hri1 =
+ new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ HRegionInfo hri2 =
+ new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
// variables to mock region sequenceIds
final AtomicLong sequenceId1 = new AtomicLong(1);
final AtomicLong sequenceId2 = new AtomicLong(1);
@@ -284,12 +286,12 @@ public class TestFSHLog {
assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
// flush region 1, and roll the wal file. Only last wal which has entries for region1 should
// remain.
- flushRegion(wal, hri1.getEncodedNameAsBytes());
+ flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
wal.rollWriter();
// only one wal should remain now (that is for the second region).
assertEquals(1, wal.getNumRolledLogFiles());
// flush the second region
- flushRegion(wal, hri2.getEncodedNameAsBytes());
+ flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
wal.rollWriter(true);
// no wal should remain now.
assertEquals(0, wal.getNumRolledLogFiles());
@@ -306,14 +308,14 @@ public class TestFSHLog {
regionsToFlush = wal.findRegionsToForceFlush();
assertEquals(2, regionsToFlush.length);
// flush both regions
- flushRegion(wal, hri1.getEncodedNameAsBytes());
- flushRegion(wal, hri2.getEncodedNameAsBytes());
+ flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
+ flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
wal.rollWriter(true);
assertEquals(0, wal.getNumRolledLogFiles());
// Add an edit to region1, and roll the wal.
addEdits(wal, hri1, t1, 2, sequenceId1);
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
- wal.startCacheFlush(hri1.getEncodedNameAsBytes());
+ wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
wal.rollWriter();
wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
assertEquals(1, wal.getNumRolledLogFiles());
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index f3f2ebe..6cdfe3b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -27,7 +27,10 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -787,13 +790,15 @@ public class TestWALReplay {
// Add 1k to each family.
final int countPerFamily = 1000;
+ Set<byte[]> familyNames = new HashSet<byte[]>();
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
ee, wal, htd, sequenceId);
+ familyNames.add(hcd.getName());
}
// Add a cache flush, shouldn't have any effect
- wal.startCacheFlush(regionName);
+ wal.startCacheFlush(regionName, familyNames);
wal.completeCacheFlush(regionName);
// Add an edit to another family, should be skipped.
@@ -833,11 +838,11 @@ public class TestWALReplay {
final HRegion region =
new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
@Override
- protected FlushResult internalFlushcache(
- final WAL wal, final long myseqid, MonitoredTask status)
+ protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
+ Collection<Store> storesToFlush, MonitoredTask status)
throws IOException {
LOG.info("InternalFlushCache Invoked");
- FlushResult fs = super.internalFlushcache(wal, myseqid,
+ FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
Mockito.mock(MonitoredTask.class));
flushcount.incrementAndGet();
return fs;
@@ -959,16 +964,16 @@ public class TestWALReplay {
private HRegion r;
@Override
- public void requestFlush(HRegion region) {
+ public void requestFlush(HRegion region, boolean forceFlushAllStores) {
try {
- r.flushcache();
+ r.flushcache(forceFlushAllStores);
} catch (IOException e) {
throw new RuntimeException("Exception flushing", e);
}
}
@Override
- public void requestDelayedFlush(HRegion region, long when) {
+ public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) {
// TODO Auto-generated method stub
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
index e008b60..df8ceaf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
@@ -147,18 +147,15 @@ public class TestDefaultWALProvider {
}
- protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
+ protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd,
int times, AtomicLong sequenceId) throws IOException {
- HTableDescriptor htd = new HTableDescriptor();
- htd.addFamily(new HColumnDescriptor("row"));
-
- final byte [] row = Bytes.toBytes("row");
+ final byte[] row = Bytes.toBytes("row");
for (int i = 0; i < times; i++) {
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row));
- log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
- sequenceId, true, null);
+ log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
+ cols, sequenceId, true, null);
}
log.sync();
}
@@ -175,8 +172,8 @@ public class TestDefaultWALProvider {
* @param wal
* @param regionEncodedName
*/
- protected void flushRegion(WAL wal, byte[] regionEncodedName) {
- wal.startCacheFlush(regionEncodedName);
+ protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
+ wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
wal.completeCacheFlush(regionEncodedName);
}
@@ -185,45 +182,47 @@ public class TestDefaultWALProvider {
@Test
public void testLogCleaning() throws Exception {
LOG.info("testLogCleaning");
- final TableName tableName =
- TableName.valueOf("testLogCleaning");
- final TableName tableName2 =
- TableName.valueOf("testLogCleaning2");
+ final HTableDescriptor htd =
+ new HTableDescriptor(TableName.valueOf("testLogCleaning")).addFamily(new HColumnDescriptor(
+ "row"));
+ final HTableDescriptor htd2 =
+ new HTableDescriptor(TableName.valueOf("testLogCleaning2"))
+ .addFamily(new HColumnDescriptor("row"));
final Configuration localConf = new Configuration(conf);
localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
final AtomicLong sequenceId = new AtomicLong(1);
try {
- HRegionInfo hri = new HRegionInfo(tableName,
+ HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
- HRegionInfo hri2 = new HRegionInfo(tableName2,
+ HRegionInfo hri2 = new HRegionInfo(htd2.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
// we want to mix edits from regions, so pick our own identifier.
final WAL log = wals.getWAL(UNSPECIFIED_REGION);
// Add a single edit and make sure that rolling won't remove the file
// Before HBASE-3198 it used to delete it
- addEdits(log, hri, tableName, 1, sequenceId);
+ addEdits(log, hri, htd, 1, sequenceId);
log.rollWriter();
assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log));
// See if there's anything wrong with more than 1 edit
- addEdits(log, hri, tableName, 2, sequenceId);
+ addEdits(log, hri, htd, 2, sequenceId);
log.rollWriter();
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
// Now mix edits from 2 regions, still no flushing
- addEdits(log, hri, tableName, 1, sequenceId);
- addEdits(log, hri2, tableName2, 1, sequenceId);
- addEdits(log, hri, tableName, 1, sequenceId);
- addEdits(log, hri2, tableName2, 1, sequenceId);
+ addEdits(log, hri, htd, 1, sequenceId);
+ addEdits(log, hri2, htd2, 1, sequenceId);
+ addEdits(log, hri, htd, 1, sequenceId);
+ addEdits(log, hri2, htd2, 1, sequenceId);
log.rollWriter();
assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log));
// Flush the first region, we expect to see the first two files getting
// archived. We need to append something or writer won't be rolled.
- addEdits(log, hri2, tableName2, 1, sequenceId);
- log.startCacheFlush(hri.getEncodedNameAsBytes());
+ addEdits(log, hri2, htd2, 1, sequenceId);
+ log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.rollWriter();
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
@@ -231,8 +230,8 @@ public class TestDefaultWALProvider {
// Flush the second region, which removes all the remaining output files
// since the oldest was completely flushed and the two others only contain
// flush information
- addEdits(log, hri2, tableName2, 1, sequenceId);
- log.startCacheFlush(hri2.getEncodedNameAsBytes());
+ addEdits(log, hri2, htd2, 1, sequenceId);
+ log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys());
log.completeCacheFlush(hri2.getEncodedNameAsBytes());
log.rollWriter();
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(log));
@@ -255,21 +254,25 @@ public class TestDefaultWALProvider {
* <p>
* @throws IOException
*/
- @Test
+ @Test
public void testWALArchiving() throws IOException {
LOG.debug("testWALArchiving");
- TableName table1 = TableName.valueOf("t1");
- TableName table2 = TableName.valueOf("t2");
+ HTableDescriptor table1 =
+ new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
+ HTableDescriptor table2 =
+ new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
final Configuration localConf = new Configuration(conf);
localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
try {
final WAL wal = wals.getWAL(UNSPECIFIED_REGION);
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
- HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW,
- HConstants.EMPTY_END_ROW);
- HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW,
- HConstants.EMPTY_END_ROW);
+ HRegionInfo hri1 =
+ new HRegionInfo(table1.getTableName(), HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW);
+ HRegionInfo hri2 =
+ new HRegionInfo(table2.getTableName(), HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW);
// ensure that we don't split the regions.
hri1.setSplit(false);
hri2.setSplit(false);
@@ -288,7 +291,7 @@ public class TestDefaultWALProvider {
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
// add a waledit to table1, and flush the region.
addEdits(wal, hri1, table1, 3, sequenceId1);
- flushRegion(wal, hri1.getEncodedNameAsBytes());
+ flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys());
// roll log; all old logs should be archived.
wal.rollWriter();
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
@@ -302,7 +305,7 @@ public class TestDefaultWALProvider {
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
// add edits for table2, and flush hri1.
addEdits(wal, hri2, table2, 2, sequenceId2);
- flushRegion(wal, hri1.getEncodedNameAsBytes());
+ flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys());
// the log : region-sequenceId map is
// log1: region2 (unflushed)
// log2: region1 (flushed)
@@ -312,7 +315,7 @@ public class TestDefaultWALProvider {
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
// flush region2, and all logs should be archived.
addEdits(wal, hri2, table2, 2, sequenceId2);
- flushRegion(wal, hri2.getEncodedNameAsBytes());
+ flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys());
wal.rollWriter();
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 47b001a..bbe4018 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -480,8 +480,9 @@ public class TestWALFactory {
@Test
public void testEditAdd() throws IOException {
final int COL_COUNT = 10;
- final TableName tableName =
- TableName.valueOf("tablename");
+ final HTableDescriptor htd =
+ new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
+ "column"));
final byte [] row = Bytes.toBytes("row");
WAL.Reader reader = null;
try {
@@ -496,16 +497,15 @@ public class TestWALFactory {
Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[] { (byte)(i + '0') }));
}
- HRegionInfo info = new HRegionInfo(tableName,
+ HRegionInfo info = new HRegionInfo(htd.getTableName(),
row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
- HTableDescriptor htd = new HTableDescriptor();
- htd.addFamily(new HColumnDescriptor("column"));
final WAL log = wals.getWAL(info.getEncodedNameAsBytes());
- final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis()), cols, sequenceId, true, null);
+ final long txid = log.append(htd, info,
+ new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()),
+ cols, sequenceId, true, null);
log.sync(txid);
- log.startCacheFlush(info.getEncodedNameAsBytes());
+ log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys());
log.completeCacheFlush(info.getEncodedNameAsBytes());
log.shutdown();
Path filename = DefaultWALProvider.getCurrentFileName(log);
@@ -519,7 +519,7 @@ public class TestWALFactory {
WALKey key = entry.getKey();
WALEdit val = entry.getEdit();
assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
- assertTrue(tableName.equals(key.getTablename()));
+ assertTrue(htd.getTableName().equals(key.getTablename()));
Cell cell = val.getCells().get(0);
assertTrue(Bytes.equals(row, cell.getRow()));
assertEquals((byte)(i + '0'), cell.getValue()[0]);
@@ -538,8 +538,9 @@ public class TestWALFactory {
@Test
public void testAppend() throws IOException {
final int COL_COUNT = 10;
- final TableName tableName =
- TableName.valueOf("tablename");
+ final HTableDescriptor htd =
+ new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
+ "column"));
final byte [] row = Bytes.toBytes("row");
WAL.Reader reader = null;
final AtomicLong sequenceId = new AtomicLong(1);
@@ -553,15 +554,14 @@ public class TestWALFactory {
Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[] { (byte)(i + '0') }));
}
- HRegionInfo hri = new HRegionInfo(tableName,
+ HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
- HTableDescriptor htd = new HTableDescriptor();
- htd.addFamily(new HColumnDescriptor("column"));
final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
- final long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis()), cols, sequenceId, true, null);
+ final long txid = log.append(htd, hri,
+ new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()),
+ cols, sequenceId, true, null);
log.sync(txid);
- log.startCacheFlush(hri.getEncodedNameAsBytes());
+ log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.shutdown();
Path filename = DefaultWALProvider.getCurrentFileName(log);
@@ -573,7 +573,7 @@ public class TestWALFactory {
for (Cell val : entry.getEdit().getCells()) {
assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
entry.getKey().getEncodedRegionName()));
- assertTrue(tableName.equals(entry.getKey().getTablename()));
+ assertTrue(htd.getTableName().equals(entry.getKey().getTablename()));
assertTrue(Bytes.equals(row, val.getRow()));
assertEquals((byte)(idx + '0'), val.getValue()[0]);
System.out.println(entry.getKey() + " " + val);
[2/2] hbase git commit: HBASE-10201 Port 'Make flush decisions per
column family' to trunk
Posted by st...@apache.org.
HBASE-10201 Port 'Make flush decisions per column family' to trunk
Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c7fad665
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c7fad665
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c7fad665
Branch: refs/heads/master
Commit: c7fad665f34fd3c17999d5cc60b04d3faff6a7f5
Parents: a411227
Author: zhangduo <zh...@wandoujia.com>
Authored: Sat Dec 13 12:49:38 2014 +0800
Committer: stack <st...@apache.org>
Committed: Tue Dec 16 11:49:17 2014 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/HTableDescriptor.java | 24 +
.../src/main/resources/hbase-default.xml | 19 +-
.../regionserver/FlushAllStoresPolicy.java | 35 +
.../regionserver/FlushLargeStoresPolicy.java | 108 ++++
.../hadoop/hbase/regionserver/FlushPolicy.java | 49 ++
.../hbase/regionserver/FlushPolicyFactory.java | 76 +++
.../hbase/regionserver/FlushRequester.java | 15 +-
.../hadoop/hbase/regionserver/HRegion.java | 314 ++++++---
.../hbase/regionserver/HRegionServer.java | 4 +-
.../hadoop/hbase/regionserver/LogRoller.java | 3 +-
.../hbase/regionserver/MemStoreFlusher.java | 75 ++-
.../hbase/regionserver/RSRpcServices.java | 10 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 242 +++++--
.../hbase/regionserver/wal/FSWALEntry.java | 29 +-
.../hadoop/hbase/wal/DisabledWALProvider.java | 8 +-
.../java/org/apache/hadoop/hbase/wal/WAL.java | 11 +-
.../org/apache/hadoop/hbase/TestIOFencing.java | 4 +-
.../regionserver/TestFlushRegionEntry.java | 4 +-
.../regionserver/TestHeapMemoryManager.java | 16 +-
.../regionserver/TestPerColumnFamilyFlush.java | 644 +++++++++++++++++++
.../hbase/regionserver/wal/TestFSHLog.java | 42 +-
.../hbase/regionserver/wal/TestWALReplay.java | 19 +-
.../hbase/wal/TestDefaultWALProvider.java | 73 ++-
.../apache/hadoop/hbase/wal/TestWALFactory.java | 36 +-
24 files changed, 1545 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 0ae0538..3f1e070 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -130,6 +130,8 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
private static final Bytes MEMSTORE_FLUSHSIZE_KEY =
new Bytes(Bytes.toBytes(MEMSTORE_FLUSHSIZE));
+ public static final String FLUSH_POLICY = "FLUSH_POLICY";
+
/**
* <em>INTERNAL</em> Used by rest interface to access this metadata
* attribute which denotes if the table is a -ROOT- region or not
@@ -766,6 +768,28 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
}
/**
+ * This sets the class associated with the flush policy which determines determines the stores
+ * need to be flushed when flushing a region. The class used by default is defined in
+ * {@link org.apache.hadoop.hbase.regionserver.FlushPolicy}
+ * @param clazz the class name
+ */
+ public HTableDescriptor setFlushPolicyClassName(String clazz) {
+ setValue(FLUSH_POLICY, clazz);
+ return this;
+ }
+
+ /**
+ * This gets the class associated with the flush policy which determines the stores need to be
+ * flushed when flushing a region. The class used by default is defined in
+ * {@link org.apache.hadoop.hbase.regionserver.FlushPolicy}
+ * @return the class name of the flush policy for this table. If this returns null, the default
+ * flush policy is used.
+ */
+ public String getFlushPolicyClassName() {
+ return getValue(FLUSH_POLICY);
+ }
+
+ /**
* Adds a column family.
* For the updating purpose please use {@link #modifyFamily(HColumnDescriptor)} instead.
* @param family HColumnDescriptor of family to add.
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 9d76dad..34deaec 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -187,7 +187,7 @@ possible configurations would overwhelm and obscure the important.
A value of 0 means a single queue shared between all the handlers.
A value of 1 means that each handler has its own queue.</description>
</property>
-<property>
+ <property>
<name>hbase.ipc.server.callqueue.read.ratio</name>
<value>0</value>
<description>Split the call queues into read and write queues.
@@ -329,8 +329,8 @@ possible configurations would overwhelm and obscure the important.
<value>org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy</value>
<description>
A split policy determines when a region should be split. The various other split policies that
- are available currently are ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy,
- DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy etc.
+ are available currently are ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy,
+ DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy etc.
</description>
</property>
@@ -588,6 +588,19 @@ possible configurations would overwhelm and obscure the important.
every hbase.server.thread.wakefrequency.</description>
</property>
<property>
+ <name>hbase.hregion.percolumnfamilyflush.size.lower.bound</name>
+ <value>16777216</value>
+ <description>
+ If FlushLargeStoresPolicy is used, then every time that we hit the
+ total memstore limit, we find out all the column families whose memstores
+ exceed this value, and only flush them, while retaining the others whose
+ memstores are lower than this limit. If none of the families have their
+ memstore size more than this, all the memstores will be flushed
+ (just as usual). This value should be less than half of the total memstore
+ threshold (hbase.hregion.memstore.flush.size).
+ </description>
+ </property>
+ <property>
<name>hbase.hregion.preclose.flush.size</name>
<value>5242880</value>
<description>
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
new file mode 100644
index 0000000..0058104
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link FlushPolicy} that always flushes all stores for a given region.
+ */
+@InterfaceAudience.Private
+public class FlushAllStoresPolicy extends FlushPolicy {
+
+ @Override
+ public Collection<Store> selectStoresToFlush() {
+ return region.stores.values();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
new file mode 100644
index 0000000..7e0e54c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link FlushPolicy} that only flushes store larger a given threshold. If no store is large
+ * enough, then all stores will be flushed.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class FlushLargeStoresPolicy extends FlushPolicy {
+
+ private static final Log LOG = LogFactory.getLog(FlushLargeStoresPolicy.class);
+
+ public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND =
+ "hbase.hregion.percolumnfamilyflush.size.lower.bound";
+
+ private static final long DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND = 1024 * 1024 * 16L;
+
+ private long flushSizeLowerBound;
+
+ @Override
+ protected void configureForRegion(HRegion region) {
+ super.configureForRegion(region);
+ long flushSizeLowerBound;
+ String flushedSizeLowerBoundString =
+ region.getTableDesc().getValue(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+ if (flushedSizeLowerBoundString == null) {
+ flushSizeLowerBound =
+ getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+ DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND
+ + " is not specified, use global config(" + flushSizeLowerBound + ") instead");
+ }
+ } else {
+ try {
+ flushSizeLowerBound = Long.parseLong(flushedSizeLowerBoundString);
+ } catch (NumberFormatException nfe) {
+ flushSizeLowerBound =
+ getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+ DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+ LOG.warn("Number format exception when parsing "
+ + HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND + " for table "
+ + region.getTableDesc().getTableName() + ":" + flushedSizeLowerBoundString + ". " + nfe
+ + ", use global config(" + flushSizeLowerBound + ") instead");
+
+ }
+ }
+ this.flushSizeLowerBound = flushSizeLowerBound;
+ }
+
+ private boolean shouldFlush(Store store) {
+ if (store.getMemStoreSize() > this.flushSizeLowerBound) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region
+ + " will be flushed because of memstoreSize(" + store.getMemStoreSize()
+ + ") is larger than lower bound(" + this.flushSizeLowerBound + ")");
+ }
+ return true;
+ }
+ return region.shouldFlushStore(store);
+ }
+
+ @Override
+ public Collection<Store> selectStoresToFlush() {
+ Collection<Store> stores = region.stores.values();
+ Set<Store> specificStoresToFlush = new HashSet<Store>();
+ for (Store store : stores) {
+ if (shouldFlush(store)) {
+ specificStoresToFlush.add(store);
+ }
+ }
+ // Didn't find any CFs which were above the threshold for selection.
+ if (specificStoresToFlush.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Since none of the CFs were above the size, flushing all.");
+ }
+ return stores;
+ } else {
+ return specificStoresToFlush;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
new file mode 100644
index 0000000..d581fee
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A flush policy determines the stores that need to be flushed when flushing a region.
+ */
+@InterfaceAudience.Private
+public abstract class FlushPolicy extends Configured {
+
+ /**
+ * The region configured for this flush policy.
+ */
+ protected HRegion region;
+
+ /**
+ * Upon construction, this method will be called with the region to be governed. It will be called
+ * once and only once.
+ */
+ protected void configureForRegion(HRegion region) {
+ this.region = region;
+ }
+
+ /**
+ * @return the stores need to be flushed.
+ */
+ public abstract Collection<Store> selectStoresToFlush();
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
new file mode 100644
index 0000000..e80b696
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The class that creates a flush policy from a conf and HTableDescriptor.
+ * <p>
+ * The default flush policy is {@link FlushLargeStoresPolicy}. And for 0.98, the default flush
+ * policy is {@link FlushAllStoresPolicy}.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class FlushPolicyFactory {
+
+ private static final Log LOG = LogFactory.getLog(FlushPolicyFactory.class);
+
+ public static final String HBASE_FLUSH_POLICY_KEY = "hbase.regionserver.flush.policy";
+
+ private static final Class<? extends FlushPolicy> DEFAULT_FLUSH_POLICY_CLASS =
+ FlushLargeStoresPolicy.class;
+
+ /**
+ * Create the FlushPolicy configured for the given table.
+ */
+ public static FlushPolicy create(HRegion region, Configuration conf) throws IOException {
+ Class<? extends FlushPolicy> clazz = getFlushPolicyClass(region.getTableDesc(), conf);
+ FlushPolicy policy = ReflectionUtils.newInstance(clazz, conf);
+ policy.configureForRegion(region);
+ return policy;
+ }
+
+ /**
+ * Get FlushPolicy class for the given table.
+ */
+ public static Class<? extends FlushPolicy> getFlushPolicyClass(HTableDescriptor htd,
+ Configuration conf) throws IOException {
+ String className = htd.getFlushPolicyClassName();
+ if (className == null) {
+ className = conf.get(HBASE_FLUSH_POLICY_KEY, DEFAULT_FLUSH_POLICY_CLASS.getName());
+ }
+ try {
+ Class<? extends FlushPolicy> clazz = Class.forName(className).asSubclass(FlushPolicy.class);
+ return clazz;
+ } catch (Exception e) {
+ LOG.warn(
+ "Unable to load configured flush policy '" + className + "' for table '"
+ + htd.getTableName() + "', load default flush policy "
+ + DEFAULT_FLUSH_POLICY_CLASS.getName() + " instead", e);
+ return DEFAULT_FLUSH_POLICY_CLASS;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
index e1c3144..7517454 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
@@ -30,26 +30,31 @@ public interface FlushRequester {
* Tell the listener the cache needs to be flushed.
*
* @param region the HRegion requesting the cache flush
+ * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
+ * rolling.
*/
- void requestFlush(HRegion region);
+ void requestFlush(HRegion region, boolean forceFlushAllStores);
+
/**
* Tell the listener the cache needs to be flushed after a delay
*
* @param region the HRegion requesting the cache flush
* @param delay after how much time should the flush happen
+ * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
+ * rolling.
*/
- void requestDelayedFlush(HRegion region, long delay);
+ void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores);
/**
* Register a FlushRequestListener
- *
+ *
* @param listener
*/
void registerFlushRequestListener(final FlushRequestListener listener);
/**
* Unregister the given FlushRequestListener
- *
+ *
* @param listener
* @return true when passed listener is unregistered successfully.
*/
@@ -57,7 +62,7 @@ public interface FlushRequester {
/**
* Sets the global memstore limit to a new size.
- *
+ *
* @param globalMemStoreSize
*/
public void setGlobalMemstoreLimit(long globalMemStoreSize);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 428e857..6cf2ce3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -41,6 +42,7 @@ import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -62,7 +64,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -134,14 +136,9 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.Write
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
-import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
-import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
-import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.Bytes;
@@ -157,6 +154,11 @@ import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.StringUtils;
@@ -230,10 +232,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
final AtomicBoolean closing = new AtomicBoolean(false);
/**
- * The sequence id of the last flush on this region. Used doing some rough calculations on
+ * The max sequence id of flushed data on this region. Used doing some rough calculations on
* whether time to flush or not.
*/
- protected volatile long lastFlushSeqId = -1L;
+ protected volatile long maxFlushedSeqId = -1L;
/**
* Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
@@ -518,7 +520,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long memstoreFlushSize;
final long timestampSlop;
final long rowProcessorTimeout;
- private volatile long lastFlushTime;
+
+ // Last flush time for each Store. Useful when we are flushing for each column
+ private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
+ new ConcurrentHashMap<Store, Long>();
+
final RegionServerServices rsServices;
private RegionServerAccounting rsAccounting;
private long flushCheckInterval;
@@ -542,6 +548,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
private HTableDescriptor htableDescriptor = null;
private RegionSplitPolicy splitPolicy;
+ private FlushPolicy flushPolicy;
private final MetricsRegion metricsRegion;
private final MetricsRegionWrapperImpl metricsRegionWrapper;
@@ -619,7 +626,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
+ MAX_FLUSH_PER_CHANGES);
}
-
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
@@ -785,8 +791,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Initialize split policy
this.splitPolicy = RegionSplitPolicy.create(this, conf);
- this.lastFlushTime = EnvironmentEdgeManager.currentTime();
- // Use maximum of wal sequenceid or that which was found in stores
+ // Initialize flush policy
+ this.flushPolicy = FlushPolicyFactory.create(this, conf);
+
+ long lastFlushTime = EnvironmentEdgeManager.currentTime();
+ for (Store store: stores.values()) {
+ this.lastStoreFlushTimeMap.put(store, lastFlushTime);
+ }
+
+ // Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = maxSeqId;
@@ -1324,10 +1337,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
status.setStatus("Running coprocessor post-close hooks");
this.coprocessorHost.postClose(abort);
}
- if ( this.metricsRegion != null) {
+ if (this.metricsRegion != null) {
this.metricsRegion.close();
}
- if ( this.metricsRegionWrapper != null) {
+ if (this.metricsRegionWrapper != null) {
Closeables.closeQuietly(this.metricsRegionWrapper);
}
status.markComplete("Closed");
@@ -1473,9 +1486,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return this.fs;
}
- /** @return the last time the region was flushed */
- public long getLastFlushTime() {
- return this.lastFlushTime;
+ /**
+ * @return Returns the earliest time a store in the region was flushed. All
+ * other stores in the region would have been flushed either at, or
+ * after this time.
+ */
+ @VisibleForTesting
+ public long getEarliestFlushTimeForAllStores() {
+ return Collections.min(lastStoreFlushTimeMap.values());
}
//////////////////////////////////////////////////////////////////////////////
@@ -1641,6 +1659,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
+ * Flush all stores.
+ * <p>
+ * See {@link #flushcache(boolean)}.
+ *
+ * @return whether the flush is success and whether the region needs compacting
+ * @throws IOException
+ */
+ public FlushResult flushcache() throws IOException {
+ return flushcache(true);
+ }
+
+ /**
* Flush the cache.
*
* When this method is called the cache will be flushed unless:
@@ -1653,14 +1683,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*
* <p>This method may block for some time, so it should not be called from a
* time-sensitive thread.
- *
- * @return true if the region needs compacting
+ * @param forceFlushAllStores whether we want to flush all stores
+ * @return whether the flush is success and whether the region needs compacting
*
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted.
*/
- public FlushResult flushcache() throws IOException {
+ public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
// fail-fast instead of waiting on the lock
if (this.closing.get()) {
String msg = "Skipping flush on " + this + " because closing";
@@ -1702,8 +1732,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
}
+
try {
- FlushResult fs = internalFlushcache(status);
+ Collection<Store> specificStoresToFlush =
+ forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
+ FlushResult fs = internalFlushcache(specificStoresToFlush, status);
if (coprocessorHost != null) {
status.setStatus("Running post-flush coprocessor hooks");
@@ -1726,12 +1759,47 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
+ * Should the store be flushed because it is old enough.
+ * <p>
+ * Every FlushPolicy should call this to determine whether a store is old enough to flush(except
+ * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
+ * returns true which will make a lot of flush requests.
+ */
+ boolean shouldFlushStore(Store store) {
+ long maxFlushedSeqId =
+ this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
+ .getFamily().getName()) - 1;
+ if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
+ + " will be flushed because its max flushed seqId(" + maxFlushedSeqId
+ + ") is far away from current(" + sequenceId.get() + "), max allowed is "
+ + flushPerChanges);
+ }
+ return true;
+ }
+ if (flushCheckInterval <= 0) {
+ return false;
+ }
+ long now = EnvironmentEdgeManager.currentTime();
+ if (store.timeOfOldestEdit() < now - flushCheckInterval) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
+ + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
+ + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Should the memstore be flushed now
*/
boolean shouldFlush() {
// This is a rough measure.
- if (this.lastFlushSeqId > 0
- && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
+ if (this.maxFlushedSeqId > 0
+ && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
return true;
}
if (flushCheckInterval <= 0) { //disabled
@@ -1739,7 +1807,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
long now = EnvironmentEdgeManager.currentTime();
//if we flushed in the recent past, we don't need to do again now
- if ((now - getLastFlushTime() < flushCheckInterval)) {
+ if ((now - getEarliestFlushTimeForAllStores() < flushCheckInterval)) {
return false;
}
//since we didn't flush in the recent past, flush now if certain conditions
@@ -1754,35 +1822,56 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
- * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
- * memstore, all of which have also been written to the wal. We need to write those updates in the
- * memstore out to disk, while being able to process reads/writes as much as possible during the
- * flush operation.
- * <p>This method may block for some time. Every time you call it, we up the regions
- * sequence id even if we don't flush; i.e. the returned region id will be at least one larger
- * than the last edit applied to this region. The returned id does not refer to an actual edit.
- * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
- * that was the result of this flush, etc.
- * @return object describing the flush's state
+ * Flushing all stores.
*
- * @throws IOException general io exceptions
- * @throws DroppedSnapshotException Thrown when replay of wal is required
- * because a Snapshot was not properly persisted.
+ * @see #internalFlushcache(Collection, MonitoredTask)
*/
- protected FlushResult internalFlushcache(MonitoredTask status)
+ private FlushResult internalFlushcache(MonitoredTask status)
throws IOException {
- return internalFlushcache(this.wal, -1, status);
+ return internalFlushcache(stores.values(), status);
+ }
+
+ /**
+ * Flushing given stores.
+ *
+ * @see #internalFlushcache(WAL, long, Collection, MonitoredTask)
+ */
+ private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
+ MonitoredTask status) throws IOException {
+ return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
+ status);
}
/**
- * @param wal Null if we're NOT to go via wal.
- * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
+ * Flush the memstore. Flushing the memstore is a little tricky. We have a lot
+ * of updates in the memstore, all of which have also been written to the wal.
+ * We need to write those updates in the memstore out to disk, while being
+ * able to process reads/writes as much as possible during the flush
+ * operation.
+ * <p>
+ * This method may block for some time. Every time you call it, we up the
+ * regions sequence id even if we don't flush; i.e. the returned region id
+ * will be at least one larger than the last edit applied to this region. The
+ * returned id does not refer to an actual edit. The returned id can be used
+ * for say installing a bulk loaded file just ahead of the last hfile that was
+ * the result of this flush, etc.
+ *
+ * @param wal
+ * Null if we're NOT to go via wal.
+ * @param myseqid
+ * The seqid to use if <code>wal</code> is null writing out flush
+ * file.
+ * @param storesToFlush
+ * The list of stores to flush.
* @return object describing the flush's state
* @throws IOException
- * @see #internalFlushcache(MonitoredTask)
+ * general io exceptions
+ * @throws DroppedSnapshotException
+ * Thrown when replay of wal is required because a Snapshot was not
+ * properly persisted.
*/
- protected FlushResult internalFlushcache(
- final WAL wal, final long myseqid, MonitoredTask status) throws IOException {
+ protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
+ final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@@ -1824,63 +1913,86 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
- LOG.info("Started memstore flush for " + this +
- ", current region memstore size " +
- StringUtils.byteDesc(this.memstoreSize.get()) +
- ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
-
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Started memstore flush for " + this + ", current region memstore size "
+ + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
+ + stores.size() + " column families' memstores are being flushed."
+ + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
+ // only log when we are not flushing all stores.
+ if (this.stores.size() > storesToFlush.size()) {
+ for (Store store: storesToFlush) {
+ LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
+ + " which was occupying "
+ + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
+ }
+ }
+ }
// Stop updates while we snapshot the memstore of all of these regions' stores. We only have
// to do this for a moment. It is quick. We also set the memstore size to zero here before we
// allow updates again so its value will represent the size of the updates received
// during flush
MultiVersionConsistencyControl.WriteEntry w = null;
-
// We have to take an update lock during snapshot, or else a write could end up in both snapshot
// and memstore (makes it difficult to do atomic rows then)
status.setStatus("Obtaining lock to block concurrent updates");
// block waiting for the lock for internal flush
this.updatesLock.writeLock().lock();
- long totalFlushableSize = 0;
status.setStatus("Preparing to flush by snapshotting stores in " +
getRegionInfo().getEncodedName());
+ long totalFlushableSizeOfFlushableStores = 0;
+
+ Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
+ for (Store store: storesToFlush) {
+ flushedFamilyNames.add(store.getFamily().getName());
+ }
+
List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
Bytes.BYTES_COMPARATOR);
- long flushSeqId = -1L;
+ // The sequence id of this flush operation which is used to log FlushMarker and pass to
+ // createFlushContext to use as the store file's sequence id.
+ long flushOpSeqId = HConstants.NO_SEQNUM;
+ // The max flushed sequence id after this flush operation. Used as completeSequenceId which is
+ // passed to HMaster.
+ long flushedSeqId = HConstants.NO_SEQNUM;
+ byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
long trxId = 0;
try {
try {
w = mvcc.beginMemstoreInsert();
if (wal != null) {
- if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
+ if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
// This should never happen.
String msg = "Flush will not be started for ["
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
status.setStatus(msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
- // Get a sequence id that we can use to denote the flush. It will be one beyond the last
- // edit that made it into the hfile (the below does not add an edit, it just asks the
- // WAL system to return next sequence edit).
- flushSeqId = getNextSequenceId(wal);
+ flushOpSeqId = getNextSequenceId(wal);
+ long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
+ // no oldestUnflushedSeqId means we flushed all stores.
+ // or the unflushed stores are all empty.
+ flushedSeqId =
+ oldestUnflushedSeqId == HConstants.NO_SEQNUM ? flushOpSeqId : oldestUnflushedSeqId - 1;
} else {
// use the provided sequence Id as WAL is not being used for this flush.
- flushSeqId = myseqid;
+ flushedSeqId = flushOpSeqId = myseqid;
}
- for (Store s : stores.values()) {
- totalFlushableSize += s.getFlushableSize();
- storeFlushCtxs.add(s.createFlushContext(flushSeqId));
+ for (Store s : storesToFlush) {
+ totalFlushableSizeOfFlushableStores += s.getFlushableSize();
+ storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
}
// write the snapshot start to WAL
if (wal != null) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
- getRegionInfo(), flushSeqId, committedFiles);
+ getRegionInfo(), flushOpSeqId, committedFiles);
+ // no sync. Sync is below where we do not hold the updates lock
trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
+ desc, sequenceId, false);
}
// Prepare flush (take a snapshot)
@@ -1892,7 +2004,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
- getRegionInfo(), flushSeqId, committedFiles);
+ getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable t) {
@@ -1909,7 +2021,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.updatesLock.writeLock().unlock();
}
String s = "Finished memstore snapshotting " + this +
- ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
+ ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
status.setStatus(s);
if (LOG.isTraceEnabled()) LOG.trace(s);
// sync unflushed WAL changes
@@ -1928,7 +2040,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that
// were removed via a rollbackMemstore could be written to Hfiles.
- w.setWriteNumber(flushSeqId);
+ w.setWriteNumber(flushOpSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
// set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
w = null;
@@ -1959,8 +2071,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
- Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have
- // same order
+ Iterator<Store> it = storesToFlush.iterator();
+ // stores.values() and storeFlushCtxs have same order
for (StoreFlushContext flush : storeFlushCtxs) {
boolean needsCompaction = flush.commit(status);
if (needsCompaction) {
@@ -1971,12 +2083,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
storeFlushCtxs.clear();
// Set down the memstore size by amount of flush.
- this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
+ this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
if (wal != null) {
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
- getRegionInfo(), flushSeqId, committedFiles);
+ getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, true);
}
@@ -1990,7 +2102,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (wal != null) {
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
- getRegionInfo(), flushSeqId, committedFiles);
+ getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable ex) {
@@ -2013,10 +2125,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
// Record latest flush time
- this.lastFlushTime = EnvironmentEdgeManager.currentTime();
+ for (Store store: storesToFlush) {
+ this.lastStoreFlushTimeMap.put(store, startTime);
+ }
- // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
- this.lastFlushSeqId = flushSeqId;
+ // Update the oldest unflushed sequence id for region.
+ this.maxFlushedSeqId = flushedSeqId;
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
@@ -2026,18 +2140,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long time = EnvironmentEdgeManager.currentTime() - startTime;
long memstoresize = this.memstoreSize.get();
- String msg = "Finished memstore flush of ~" +
- StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
- ", currentsize=" +
- StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
- " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
- ", compaction requested=" + compactionRequested +
- ((wal == null)? "; wal=null": "");
+ String msg = "Finished memstore flush of ~"
+ + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
+ + totalFlushableSizeOfFlushableStores + ", currentsize="
+ + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
+ + " for region " + this + " in " + time + "ms, sequenceid="
+ + flushOpSeqId + ", compaction requested=" + compactionRequested
+ + ((wal == null) ? "; wal=null" : "");
LOG.info(msg);
status.setStatus(msg);
return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
- FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
+ FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
}
/**
@@ -2168,7 +2282,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if(delete.getFamilyCellMap().isEmpty()){
for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
// Don't eat the timestamp
- delete.deleteFamily(family, delete.getTimeStamp());
+ delete.addFamily(family, delete.getTimeStamp());
}
} else {
for(byte [] family : delete.getFamilyCellMap().keySet()) {
@@ -2819,6 +2933,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
coprocessorHost.postBatchMutate(miniBatchOp);
}
+
// ------------------------------------------------------------------
// STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
// ------------------------------------------------------------------
@@ -2850,7 +2965,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
success = true;
return addedSize;
} finally {
-
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
rollbackMemstore(memstoreCells);
@@ -3209,8 +3323,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* We throw RegionTooBusyException if above memstore limit
* and expect client to retry using some kind of backoff
*/
- private void checkResources()
- throws RegionTooBusyException {
+ private void checkResources() throws RegionTooBusyException {
// If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().isMetaRegion()) return;
@@ -3406,7 +3519,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
writestate.flushRequested = true;
}
// Make request outside of synchronize block; HBASE-818.
- this.rsServices.getFlushRequester().requestFlush(this);
+ this.rsServices.getFlushRequester().requestFlush(this, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this);
}
@@ -3527,7 +3640,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
- internalFlushcache(null, seqid, status);
+ internalFlushcache(null, seqid, stores.values(), status);
}
// Now delete the content of recovered edits. We're done w/ them.
for (Path file: files) {
@@ -3681,7 +3794,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
editsCount++;
}
if (flush) {
- internalFlushcache(null, currentEditSeqId, status);
+ internalFlushcache(null, currentEditSeqId, stores.values(), status);
}
if (coprocessorHost != null) {
@@ -4029,7 +4142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
// a sequence id that we can be sure is beyond the last hfile written).
if (assignSeqId) {
- FlushResult fs = this.flushcache();
+ FlushResult fs = this.flushcache(true);
if (fs.isFlushSucceeded()) {
seqId = fs.flushSequenceId;
} else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
@@ -5072,8 +5185,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
FileSystem fs = a.getRegionFileSystem().getFileSystem();
// Make sure each region's cache is empty
- a.flushcache();
- b.flushcache();
+ a.flushcache(true);
+ b.flushcache(true);
// Compact each region so we only have one store file per family
a.compactStores(true);
@@ -5187,7 +5300,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// do after lock
if (this.metricsRegion != null) {
- long totalSize = 0l;
+ long totalSize = 0L;
for (Cell cell : results) {
totalSize += CellUtil.estimatedSerializedSizeOf(cell);
}
@@ -5369,7 +5482,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
-
// 9. Release region lock
if (locked) {
this.updatesLock.readLock().unlock();
@@ -5497,7 +5609,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
WALEdit walEdits = null;
List<Cell> allKVs = new ArrayList<Cell>(append.size());
Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
-
long size = 0;
long txid = 0;
@@ -5699,7 +5810,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
-
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
} finally {
@@ -5996,8 +6106,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
- (12 * Bytes.SIZEOF_LONG) +
+ 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ (11 * Bytes.SIZEOF_LONG) +
4 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing:
@@ -6568,6 +6678,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return this.maxSeqIdInStores;
}
+ @VisibleForTesting
+ public long getOldestSeqIdOfStore(byte[] familyName) {
+ return wal.getEarliestMemstoreSeqNum(getRegionInfo()
+ .getEncodedNameAsBytes(), familyName);
+ }
+
/**
* @return if a given region is in compaction now.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 9bd7abc..c20f728 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1392,7 +1392,7 @@ public class HRegionServer extends HasThread implements
.setWriteRequestsCount(r.writeRequestsCount.get())
.setTotalCompactingKVs(totalCompactingKVs)
.setCurrentCompactedKVs(currentCompactedKVs)
- .setCompleteSequenceId(r.lastFlushSeqId)
+ .setCompleteSequenceId(r.maxFlushedSeqId)
.setDataLocality(dataLocality);
return regionLoadBldr.build();
@@ -1488,7 +1488,7 @@ public class HRegionServer extends HasThread implements
//Throttle the flushes by putting a delay. If we don't throttle, and there
//is a balanced write-load on the regions in a table, we might end up
//overwhelming the filesystem with too many flushes at once.
- requester.requestDelayedFlush(r, randomDelay);
+ requester.requestDelayedFlush(r, randomDelay, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 6f5dfa4..aa60bfb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -166,7 +166,8 @@ class LogRoller extends HasThread {
if (r != null) {
requester = this.services.getFlushRequester();
if (requester != null) {
- requester.requestFlush(r);
+ // force flushing all stores to clean old logs
+ requester.requestFlush(r, true);
scheduled = true;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 1d59701..eece27a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -39,10 +39,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Counter;
@@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.htrace.Trace;
import org.htrace.TraceScope;
@@ -114,11 +114,11 @@ class MemStoreFlusher implements FlushRequester {
90000);
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
this.flushHandlers = new FlushHandler[handlerCount];
- LOG.info("globalMemStoreLimit=" +
- StringUtils.humanReadableInt(this.globalMemStoreLimit) +
- ", globalMemStoreLimitLowMark=" +
- StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
- ", maxHeap=" + StringUtils.humanReadableInt(max));
+ LOG.info("globalMemStoreLimit="
+ + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
+ + ", globalMemStoreLimitLowMark="
+ + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
+ + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
}
public Counter getUpdatesBlockedMsHighWater() {
@@ -160,13 +160,12 @@ class MemStoreFlusher implements FlushRequester {
// lots of little flushes and cause lots of compactions, etc, which just makes
// life worse!
if (LOG.isDebugEnabled()) {
- LOG.debug("Under global heap pressure: " +
- "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
- "store files, but is " +
- StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
- " vs best flushable region's " +
- StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
- ". Choosing the bigger.");
+ LOG.debug("Under global heap pressure: " + "Region "
+ + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is "
+ + TraditionalBinaryPrefix.long2String(bestAnyRegion.memstoreSize.get(), "", 1)
+ + " vs best flushable region's "
+ + TraditionalBinaryPrefix.long2String(bestFlushableRegion.memstoreSize.get(), "", 1)
+ + ". Choosing the bigger.");
}
regionToFlush = bestAnyRegion;
} else {
@@ -180,7 +179,7 @@ class MemStoreFlusher implements FlushRequester {
Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
- flushedOne = flushRegion(regionToFlush, true);
+ flushedOne = flushRegion(regionToFlush, true, true);
if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush +
" - trying to find a different region to flush.");
@@ -206,7 +205,7 @@ class MemStoreFlusher implements FlushRequester {
if (fqe == null || fqe instanceof WakeupFlushThread) {
if (isAboveLowWaterMark()) {
LOG.debug("Flush thread woke up because memory above low water="
- + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
+ + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
if (!flushOneForGlobalPressure()) {
// Wasn't able to flush any region, but we're above low water mark
// This is unlikely to happen, but might happen when closing the
@@ -293,23 +292,23 @@ class MemStoreFlusher implements FlushRequester {
getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
}
- public void requestFlush(HRegion r) {
+ public void requestFlush(HRegion r, boolean forceFlushAllStores) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
- FlushRegionEntry fqe = new FlushRegionEntry(r);
+ FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
}
}
}
- public void requestDelayedFlush(HRegion r, long delay) {
+ public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has some delay
- FlushRegionEntry fqe = new FlushRegionEntry(r);
+ FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
fqe.requeue(delay);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
@@ -362,7 +361,7 @@ class MemStoreFlusher implements FlushRequester {
}
}
- /*
+ /**
* A flushRegion that checks store file count. If too many, puts the flush
* on delay queue to retry later.
* @param fqe
@@ -406,22 +405,23 @@ class MemStoreFlusher implements FlushRequester {
return true;
}
}
- return flushRegion(region, false);
+ return flushRegion(region, false, fqe.isForceFlushAllStores());
}
- /*
+ /**
* Flush a region.
* @param region Region to flush.
* @param emergencyFlush Set if we are being force flushed. If true the region
* needs to be removed from the flush queue. If false, when we were called
* from the main flusher run loop and we got the entry to flush by calling
* poll on the flush queue (which removed it).
- *
+ * @param forceFlushAllStores whether we want to flush all store.
* @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the log was
* not flushed.
*/
- private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
+ private boolean flushRegion(final HRegion region, final boolean emergencyFlush,
+ boolean forceFlushAllStores) {
long startTime = 0;
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
@@ -444,7 +444,7 @@ class MemStoreFlusher implements FlushRequester {
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
- HRegion.FlushResult flushResult = region.flushcache();
+ HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores);
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
@@ -528,11 +528,12 @@ class MemStoreFlusher implements FlushRequester {
while (isAboveHighWaterMark() && !server.isStopped()) {
if (!blocked) {
startTime = EnvironmentEdgeManager.currentTime();
- LOG.info("Blocking updates on " + server.toString() +
- ": the global memstore size " +
- StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
- " is >= than blocking " +
- StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
+ LOG.info("Blocking updates on "
+ + server.toString()
+ + ": the global memstore size "
+ + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
+ .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
+ + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
}
blocked = true;
wakeupFlushThread();
@@ -656,10 +657,13 @@ class MemStoreFlusher implements FlushRequester {
private long whenToExpire;
private int requeueCount = 0;
- FlushRegionEntry(final HRegion r) {
+ private boolean forceFlushAllStores;
+
+ FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) {
this.region = r;
this.createTime = EnvironmentEdgeManager.currentTime();
this.whenToExpire = this.createTime;
+ this.forceFlushAllStores = forceFlushAllStores;
}
/**
@@ -679,6 +683,13 @@ class MemStoreFlusher implements FlushRequester {
}
/**
+ * @return whether we need to flush all stores.
+ */
+ public boolean isForceFlushAllStores() {
+ return forceFlushAllStores;
+ }
+
+ /**
* @param when When to expire, when to come up out of the queue.
* Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime()
* to whatever you pass.
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 32d59d4..492b26d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
@@ -150,8 +149,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Counter;
@@ -159,6 +156,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
@@ -694,7 +693,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private OperationStatus [] doReplayBatchOp(final HRegion region,
final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
-
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
@@ -1076,7 +1074,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
LOG.info("Flushing " + region.getRegionNameAsString());
boolean shouldFlush = true;
if (request.hasIfOlderThanTs()) {
- shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
+ shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
}
FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
if (shouldFlush) {
@@ -1093,7 +1091,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
builder.setFlushed(result);
}
- builder.setLastFlushTime(region.getLastFlushTime());
+ builder.setLastFlushTime( region.getEarliestFlushTimeForAllStores());
return builder.build();
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical