You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/14 14:35:59 UTC
svn commit: r1446173 [3/5] - in /hbase/branches/hbase-7290v2: ./ bin/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/
hbase-common/src/main/java/org/apache/hadoop/hbase/util/
hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/...
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Feb 14 13:35:54 2013
@@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
@@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.util.FSUt
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -104,9 +106,10 @@ import com.google.common.collect.Lists;
* <p>Locking and transactions are handled at a higher level. This API should
* not be called directly but by an HRegion manager.
*/
-//TODO: move StoreConfiguration implementation into a separate class.
@InterfaceAudience.Private
-public class HStore implements Store, StoreConfiguration {
+public class HStore implements Store {
+ public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
+
static final Log LOG = LogFactory.getLog(HStore.class);
protected final MemStore memstore;
@@ -124,7 +127,6 @@ public class HStore implements Store, St
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
static int closeCheckInterval = 0;
- private final int blockingStoreFileCount;
private volatile long storeSize = 0L;
private volatile long totalUncompressedBytes = 0L;
private final Object flushLock = new Object();
@@ -133,12 +135,7 @@ public class HStore implements Store, St
private ScanInfo scanInfo;
- /*
- * List of store files inside this store. This is an immutable list that
- * is atomically replaced when its contents change.
- */
- private volatile ImmutableList<StoreFile> storefiles = null;
-
+ private StoreFileManager storeFileManager;
final List<StoreFile> filesCompacting = Lists.newArrayList();
// All access must be synchronized.
@@ -213,8 +210,7 @@ public class HStore implements Store, St
// Setting up cache configuration for this family
this.cacheConf = new CacheConfig(conf, family);
- this.blockingStoreFileCount =
- conf.getInt("hbase.hstore.blockingStoreFiles", 7);
+
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
@@ -222,7 +218,9 @@ public class HStore implements Store, St
HStore.closeCheckInterval = conf.getInt(
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
}
- this.storefiles = sortAndClone(loadStoreFiles());
+
+ this.storeFileManager = new DefaultStoreFileManager(this.comparator);
+ this.storeFileManager.loadFiles(loadStoreFiles());
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
this.checksumType = getChecksumType(conf);
@@ -293,21 +291,18 @@ public class HStore implements Store, St
return this.fs;
}
- /* Implementation of StoreConfiguration */
+ /* Implementation of StoreConfigInformation */
+ @Override
public long getStoreFileTtl() {
// TTL only applies if there's no MIN_VERSIONs setting on the column.
return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
}
- public Long getMajorCompactionPeriod() {
- String strCompactionTime = this.family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
- return (strCompactionTime != null) ? new Long(strCompactionTime) : null;
- }
-
+ @Override
public long getMemstoreFlushSize() {
return this.region.memstoreFlushSize;
}
- /* End implementation of StoreConfiguration */
+ /* End implementation of StoreConfigInformation */
/**
* Returns the configured bytesPerChecksum value.
@@ -345,7 +340,7 @@ public class HStore implements Store, St
}
/**
- * @return The maximum sequence id in all store files.
+ * @return The maximum sequence id in all store files. Used for log replay.
*/
long getMaxSequenceId(boolean includeBulkFiles) {
return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
@@ -533,8 +528,8 @@ public class HStore implements Store, St
* @return All store files.
*/
@Override
- public List<StoreFile> getStorefiles() {
- return this.storefiles;
+ public Collection<StoreFile> getStorefiles() {
+ return this.storeFileManager.getStorefiles();
}
@Override
@@ -637,11 +632,9 @@ public class HStore implements Store, St
// Append the new storefile into the list
this.lock.writeLock().lock();
try {
- ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
- newFiles.add(sf);
- this.storefiles = sortAndClone(newFiles);
+ this.storeFileManager.insertNewFile(sf);
} finally {
- // We need the lock, as long as we are updating the storefiles
+ // We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
@@ -664,13 +657,11 @@ public class HStore implements Store, St
}
@Override
- public ImmutableList<StoreFile> close() throws IOException {
+ public ImmutableCollection<StoreFile> close() throws IOException {
this.lock.writeLock().lock();
try {
- ImmutableList<StoreFile> result = storefiles;
-
// Clear so metrics doesn't find them.
- storefiles = ImmutableList.of();
+ ImmutableCollection<StoreFile> result = storeFileManager.clearFiles();
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
@@ -967,7 +958,7 @@ public class HStore implements Store, St
}
/*
- * Change storefiles adding into place the Reader produced by this new flush.
+ * Change storeFiles adding into place the Reader produced by this new flush.
* @param sf
* @param set That was used to make the passed file <code>p</code>.
* @throws IOException
@@ -978,13 +969,10 @@ public class HStore implements Store, St
throws IOException {
this.lock.writeLock().lock();
try {
- ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
- newList.add(sf);
- storefiles = sortAndClone(newList);
-
+ this.storeFileManager.insertNewFile(sf);
this.memstore.clearSnapshot(set);
} finally {
- // We need the lock, as long as we are updating the storefiles
+ // We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
@@ -1014,14 +1002,13 @@ public class HStore implements Store, St
* @return all scanners for this store
*/
protected List<KeyValueScanner> getScanners(boolean cacheBlocks,
- boolean isGet,
- boolean isCompaction,
- ScanQueryMatcher matcher) throws IOException {
- List<StoreFile> storeFiles;
+ boolean isGet, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
+ byte[] stopRow) throws IOException {
+ Collection<StoreFile> storeFilesToScan;
List<KeyValueScanner> memStoreScanners;
this.lock.readLock().lock();
try {
- storeFiles = this.getStorefiles();
+ storeFilesToScan = this.storeFileManager.getFilesForScanOrGet(isGet, startRow, stopRow);
memStoreScanners = this.memstore.getScanners();
} finally {
this.lock.readLock().unlock();
@@ -1033,7 +1020,7 @@ public class HStore implements Store, St
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner
- .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher);
+ .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, isGet, isCompaction, matcher);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
@@ -1153,15 +1140,21 @@ public class HStore implements Store, St
return sfs;
}
- @Override
- public void compactRecentForTesting(int N) throws IOException {
+ /**
+ * This method tries to compact N recent files for testing.
+ * Note that because compacting "recent" files only makes sense for some policies,
+ * e.g. the default one, it assumes default policy is used. It doesn't use policy,
+ * but instead makes a compaction candidate list by itself.
+ * @param N Number of files.
+ */
+ public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
List<StoreFile> filesToCompact;
boolean isMajor;
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
- filesToCompact = Lists.newArrayList(storefiles);
+ filesToCompact = Lists.newArrayList(storeFileManager.getStorefiles());
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
@@ -1176,7 +1169,7 @@ public class HStore implements Store, St
}
filesToCompact = filesToCompact.subList(count - N, count);
- isMajor = (filesToCompact.size() == storefiles.size());
+ isMajor = (filesToCompact.size() == storeFileManager.getStorefileCount());
filesCompacting.addAll(filesToCompact);
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
}
@@ -1204,7 +1197,7 @@ public class HStore implements Store, St
@Override
public boolean hasReferences() {
- return StoreUtils.hasReferences(this.storefiles);
+ return StoreUtils.hasReferences(this.storeFileManager.getStorefiles());
}
@Override
@@ -1214,15 +1207,14 @@ public class HStore implements Store, St
@Override
public boolean isMajorCompaction() throws IOException {
- for (StoreFile sf : this.storefiles) {
+ for (StoreFile sf : this.storeFileManager.getStorefiles()) {
+ // TODO: what are these reader checks all over the place?
if (sf.getReader() == null) {
LOG.debug("StoreFile " + sf + " has null Reader");
return false;
}
}
-
- List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
- return compactionPolicy.isMajorCompaction(candidates);
+ return compactionPolicy.isMajorCompaction(this.storeFileManager.getStorefiles());
}
public CompactionRequest requestCompaction() throws IOException {
@@ -1238,22 +1230,13 @@ public class HStore implements Store, St
CompactionRequest ret = null;
this.lock.readLock().lock();
try {
+ List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
synchronized (filesCompacting) {
- // candidates = all storefiles not already in compaction queue
- List<StoreFile> candidates = Lists.newArrayList(storefiles);
- if (!filesCompacting.isEmpty()) {
- // exclude all files older than the newest file we're currently
- // compacting. this allows us to preserve contiguity (HBASE-2856)
- StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
- int idx = candidates.indexOf(last);
- Preconditions.checkArgument(idx != -1);
- candidates.subList(0, idx + 1).clear();
- }
-
+ // First we need to pre-select compaction, and then pre-compact selection!
+ candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting);
boolean override = false;
if (region.getCoprocessorHost() != null) {
- override = region.getCoprocessorHost().preCompactSelection(
- this, candidates);
+ override = region.getCoprocessorHost().preCompactSelection(this, candidates);
}
CompactSelection filesToCompact;
if (override) {
@@ -1284,9 +1267,8 @@ public class HStore implements Store, St
filesCompacting.addAll(filesToCompact.getFilesToCompact());
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
- // major compaction iff all StoreFiles are included
boolean isMajor =
- (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
+ (filesToCompact.getFilesToCompact().size() == this.getStorefilesCount());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
@@ -1382,25 +1364,22 @@ public class HStore implements Store, St
this.family.getBloomFilterType(), this.dataBlockEncoder);
result.createReader();
}
+
try {
this.lock.writeLock().lock();
try {
- // Change this.storefiles so it reflects new state but do not
+ // Change this.storeFiles so it reflects new state but do not
// delete old store files until we have sent out notification of
// change in case old files are still being accessed by outstanding
// scanners.
- ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
- newStoreFiles.removeAll(compactedFiles);
- filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
-
- // If a StoreFile result, move it into place. May be null.
+ List<StoreFile> results = new ArrayList<StoreFile>(1);
if (result != null) {
- newStoreFiles.add(result);
+ results.add(result);
}
-
- this.storefiles = sortAndClone(newStoreFiles);
+ this.storeFileManager.addCompactionResults(compactedFiles, results);
+ filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
} finally {
- // We need the lock, as long as we are updating the storefiles
+ // We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
@@ -1427,7 +1406,7 @@ public class HStore implements Store, St
// 4. Compute new store size
this.storeSize = 0L;
this.totalUncompressedBytes = 0L;
- for (StoreFile hsf : this.storefiles) {
+ for (StoreFile hsf : this.storeFileManager.getStorefiles()) {
StoreFile.Reader r = hsf.getReader();
if (r == null) {
LOG.warn("StoreFile " + hsf + " has a null Reader");
@@ -1439,21 +1418,6 @@ public class HStore implements Store, St
return result;
}
- public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
- Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
- ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
- return newList;
- }
-
- // ////////////////////////////////////////////////////////////////////////////
- // Accessors.
- // (This is the only section that is directly useful!)
- //////////////////////////////////////////////////////////////////////////////
- @Override
- public int getNumberOfStoreFiles() {
- return this.storefiles.size();
- }
-
/*
* @param wantedVersions How many versions were asked for.
* @return wantedVersions or this families' {@link HConstants#VERSIONS}.
@@ -1490,10 +1454,18 @@ public class HStore implements Store, St
// First go to the memstore. Pick up deletes and candidates.
this.memstore.getRowKeyAtOrBefore(state);
// Check if match, if we got a candidate on the asked for 'kv' row.
- // Process each store file. Run through from newest to oldest.
- for (StoreFile sf : Lists.reverse(storefiles)) {
- // Update the candidate keys from the current map file
- rowAtOrBeforeFromStoreFile(sf, state);
+ // Process each relevant store file. Run through from newest to oldest.
+ Iterator<StoreFile> sfIterator =
+ this.storeFileManager.getCandidateFilesForRowKeyBefore(state.getTargetKey());
+ while (sfIterator.hasNext()) {
+ StoreFile sf = sfIterator.next();
+ sfIterator.remove(); // Remove sf from iterator.
+ boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
+ if (haveNewCandidate) {
+ // TODO: we may have an optimization here which stops the search if we find exact match.
+ sfIterator = this.storeFileManager.updateCandidateFilesForRowKeyBefore(sfIterator,
+ state.getTargetKey(), state.getCandidate());
+ }
}
return state.getCandidate();
} finally {
@@ -1506,22 +1478,23 @@ public class HStore implements Store, St
* @param f
* @param state
* @throws IOException
+ * @return True iff the candidate has been updated in the state.
*/
- private void rowAtOrBeforeFromStoreFile(final StoreFile f,
+ private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
final GetClosestRowBeforeTracker state)
throws IOException {
StoreFile.Reader r = f.getReader();
if (r == null) {
LOG.warn("StoreFile " + f + " has a null Reader");
- return;
+ return false;
}
if (r.getEntries() == 0) {
LOG.warn("StoreFile " + f + " is a empty store file");
- return;
+ return false;
}
// TODO: Cache these keys rather than make each time?
byte [] fk = r.getFirstKey();
- if (fk == null) return;
+ if (fk == null) return false;
KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
byte [] lk = r.getLastKey();
KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
@@ -1529,7 +1502,7 @@ public class HStore implements Store, St
if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
// If last key in file is not of the target table, no candidates in this
// file. Return.
- if (!state.isTargetTable(lastKV)) return;
+ if (!state.isTargetTable(lastKV)) return false;
// If the row we're looking for is past the end of file, set search key to
// last key. TODO: Cache last and first key rather than make each time.
firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
@@ -1537,10 +1510,10 @@ public class HStore implements Store, St
// Get a scanner that caches blocks and that uses pread.
HFileScanner scanner = r.getScanner(true, true, false);
// Seek scanner. If can't seek it, return.
- if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
+ if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
// If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
// Unlikely that there'll be an instance of actual first row in table.
- if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
+ if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
// If here, need to start backing up.
while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
firstOnRow.getKeyLength())) {
@@ -1550,10 +1523,11 @@ public class HStore implements Store, St
// Make new first on row.
firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
// Seek scanner. If can't seek it, break.
- if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
+ if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
// If we find something, break;
- if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
+ if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
}
+ return false;
}
/*
@@ -1612,17 +1586,12 @@ public class HStore implements Store, St
public boolean canSplit() {
this.lock.readLock().lock();
try {
- // Not splitable if we find a reference store file present in the store.
- for (StoreFile sf : storefiles) {
- if (sf.isReference()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(sf + " is not splittable");
- }
- return false;
- }
+ // Not split-able if we find a reference store file present in the store.
+ boolean result = !hasReferences();
+ if (!result && LOG.isDebugEnabled()) {
+ LOG.debug("Cannot split region due to reference files being there");
}
-
- return true;
+ return result;
} finally {
this.lock.readLock().unlock();
}
@@ -1632,64 +1601,14 @@ public class HStore implements Store, St
public byte[] getSplitPoint() {
this.lock.readLock().lock();
try {
- // sanity checks
- if (this.storefiles.isEmpty()) {
- return null;
- }
// Should already be enforced by the split policy!
assert !this.region.getRegionInfo().isMetaRegion();
-
- // Not splitable if we find a reference store file present in the store.
- long maxSize = 0L;
- StoreFile largestSf = null;
- for (StoreFile sf : storefiles) {
- if (sf.isReference()) {
- // Should already be enforced since we return false in this case
- assert false : "getSplitPoint() called on a region that can't split!";
- return null;
- }
-
- StoreFile.Reader r = sf.getReader();
- if (r == null) {
- LOG.warn("Storefile " + sf + " Reader is null");
- continue;
- }
-
- long size = r.length();
- if (size > maxSize) {
- // This is the largest one so far
- maxSize = size;
- largestSf = sf;
- }
- }
-
- StoreFile.Reader r = largestSf.getReader();
- if (r == null) {
- LOG.warn("Storefile " + largestSf + " Reader is null");
+ // Not split-able if we find a reference store file present in the store.
+ if (hasReferences()) {
+ assert false : "getSplitPoint() called on a region that can't split!";
return null;
}
- // Get first, last, and mid keys. Midkey is the key that starts block
- // in middle of hfile. Has column and timestamp. Need to return just
- // the row we want to split on as midkey.
- byte [] midkey = r.midkey();
- if (midkey != null) {
- KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
- byte [] fk = r.getFirstKey();
- KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
- byte [] lk = r.getLastKey();
- KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
- // if the midkey is the same as the first or last keys, then we cannot
- // (ever) split this region.
- if (this.comparator.compareRows(mk, firstKey) == 0 ||
- this.comparator.compareRows(mk, lastKey) == 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("cannot split because midkey is the same as first or " +
- "last row");
- }
- return null;
- }
- return mk.getRow();
- }
+ return this.storeFileManager.getSplitPoint();
} catch(IOException e) {
LOG.warn("Failed getting store size for " + this, e);
} finally {
@@ -1745,7 +1664,7 @@ public class HStore implements Store, St
@Override
public int getStorefilesCount() {
- return this.storefiles.size();
+ return this.storeFileManager.getStorefileCount();
}
@Override
@@ -1756,7 +1675,7 @@ public class HStore implements Store, St
@Override
public long getStorefilesSize() {
long size = 0;
- for (StoreFile s: storefiles) {
+ for (StoreFile s: this.storeFileManager.getStorefiles()) {
StoreFile.Reader r = s.getReader();
if (r == null) {
LOG.warn("StoreFile " + s + " has a null Reader");
@@ -1770,7 +1689,7 @@ public class HStore implements Store, St
@Override
public long getStorefilesIndexSize() {
long size = 0;
- for (StoreFile s: storefiles) {
+ for (StoreFile s: this.storeFileManager.getStorefiles()) {
StoreFile.Reader r = s.getReader();
if (r == null) {
LOG.warn("StoreFile " + s + " has a null Reader");
@@ -1784,7 +1703,7 @@ public class HStore implements Store, St
@Override
public long getTotalStaticIndexSize() {
long size = 0;
- for (StoreFile s : storefiles) {
+ for (StoreFile s : this.storeFileManager.getStorefiles()) {
size += s.getReader().getUncompressedDataIndexSize();
}
return size;
@@ -1793,7 +1712,7 @@ public class HStore implements Store, St
@Override
public long getTotalStaticBloomSize() {
long size = 0;
- for (StoreFile s : storefiles) {
+ for (StoreFile s : this.storeFileManager.getStorefiles()) {
StoreFile.Reader r = s.getReader();
size += r.getTotalBloomSize();
}
@@ -1811,12 +1730,12 @@ public class HStore implements Store, St
@Override
public int getCompactPriority(int priority) {
- // If this is a user-requested compaction, leave this at the highest priority
- if(priority == Store.PRIORITY_USER) {
- return Store.PRIORITY_USER;
- } else {
- return this.blockingStoreFileCount - this.storefiles.size();
+ // If this is a user-requested compaction, leave this at the user priority
+ if (priority != Store.PRIORITY_USER) {
+ priority = this.compactionPolicy.getSystemCompactionPriority(
+ this.storeFileManager.getStorefiles());
}
+ return priority;
}
@Override
@@ -1927,7 +1846,7 @@ public class HStore implements Store, St
@Override
public boolean needsCompaction() {
- return compactionPolicy.needsCompaction(storefiles.size() - filesCompacting.size());
+ return compactionPolicy.needsCompaction(this.storeFileManager.getStorefiles(), filesCompacting);
}
@Override
@@ -1937,7 +1856,7 @@ public class HStore implements Store, St
public static final long FIXED_OVERHEAD =
ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
- + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+ + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java Thu Feb 14 13:35:54 2013
@@ -28,13 +28,15 @@ import org.apache.hadoop.classification.
* rows by a prefix of the row-key
*
* This ensures that a region is not split "inside" a prefix of a row key.
- * I.e. rows can be co-located in a regionb by their prefix.
+ * I.e. rows can be co-located in a region by their prefix.
*/
@InterfaceAudience.Private
public class KeyPrefixRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPolicy {
private static final Log LOG = LogFactory
.getLog(KeyPrefixRegionSplitPolicy.class);
- public static final String PREFIX_LENGTH_KEY = "prefix_split_key_policy.prefix_length";
+ @Deprecated
+ public static final String PREFIX_LENGTH_KEY_DEPRECATED = "prefix_split_key_policy.prefix_length";
+ public static final String PREFIX_LENGTH_KEY = "KeyPrefixRegionSplitPolicy.prefix_length";
private int prefixLength = 0;
@@ -48,10 +50,14 @@ public class KeyPrefixRegionSplitPolicy
String prefixLengthString = region.getTableDesc().getValue(
PREFIX_LENGTH_KEY);
if (prefixLengthString == null) {
- LOG.error(PREFIX_LENGTH_KEY + " not specified for table "
- + region.getTableDesc().getNameAsString()
- + ". Using default RegionSplitPolicy");
- return;
+ //read the deprecated value
+ prefixLengthString = region.getTableDesc().getValue(PREFIX_LENGTH_KEY_DEPRECATED);
+ if (prefixLengthString == null) {
+ LOG.error(PREFIX_LENGTH_KEY + " not specified for table "
+ + region.getTableDesc().getNameAsString()
+ + ". Using default RegionSplitPolicy");
+ return;
+ }
}
try {
prefixLength = Integer.parseInt(prefixLengthString);
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Feb 14 13:35:54 2013
@@ -61,7 +61,7 @@ public class MemStore implements HeapSiz
static final String USEMSLAB_KEY =
"hbase.hregion.memstore.mslab.enabled";
- private static final boolean USEMSLAB_DEFAULT = false;
+ private static final boolean USEMSLAB_DEFAULT = true;
private Configuration conf;
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Thu Feb 14 13:35:54 2013
@@ -85,7 +85,7 @@ class MemStoreFlusher implements FlushRe
"hbase.regionserver.global.memstore.upperLimit";
private static final String LOWER_KEY =
"hbase.regionserver.global.memstore.lowerLimit";
- private long blockingStoreFilesNumber;
+ private int blockingStoreFileCount;
private long blockingWaitTime;
private final Counter updatesBlockedMsHighWater = new Counter();
@@ -112,8 +112,8 @@ class MemStoreFlusher implements FlushRe
"because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
}
this.globalMemStoreLimitLowMark = lower;
- this.blockingStoreFilesNumber =
- conf.getInt("hbase.hstore.blockingStoreFiles", 7);
+ this.blockingStoreFileCount =
+ conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
90000);
this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
@@ -482,7 +482,7 @@ class MemStoreFlusher implements FlushRe
private boolean isTooManyStoreFiles(HRegion region) {
for (Store hstore : region.stores.values()) {
- if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
+ if (hstore.getStorefilesCount() > this.blockingStoreFileCount) {
return true;
}
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Thu Feb 14 13:35:54 2013
@@ -313,11 +313,11 @@ public class SplitTransaction {
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
// add entry to journal BEFORE rather than AFTER the change.
this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
- HRegion a = createDaughterRegion(this.hri_a, this.parent.rsServices);
+ HRegion a = createDaughterRegion(this.hri_a);
// Ditto
this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
- HRegion b = createDaughterRegion(this.hri_b, this.parent.rsServices);
+ HRegion b = createDaughterRegion(this.hri_b);
// This is the point of no return. Adding subsequent edits to .META. as we
// do below when we do the daughter opens adding each to .META. can fail in
@@ -696,20 +696,10 @@ public class SplitTransaction {
* @throws IOException
* @see #cleanupDaughterRegion(FileSystem, Path, String)
*/
- HRegion createDaughterRegion(final HRegionInfo hri,
- final RegionServerServices rsServices)
- throws IOException {
+ HRegion createDaughterRegion(final HRegionInfo hri) throws IOException {
// Package private so unit tests have access.
- FileSystem fs = this.parent.getFilesystem();
- Path regionDir = getSplitDirForDaughter(this.parent.getFilesystem(),
- this.splitdir, hri);
- HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
- this.parent.getLog(), fs, this.parent.getBaseConf(),
- hri, this.parent.getTableDesc(), rsServices);
- r.readRequestsCount.set(this.parent.getReadRequestsCount() / 2);
- r.writeRequestsCount.set(this.parent.getWriteRequestsCount() / 2);
- HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
- return r;
+ Path regionDir = getSplitDirForDaughter(this.splitdir, hri);
+ return this.parent.createDaughterRegion(hri, regionDir);
}
private static void cleanupDaughterRegion(final FileSystem fs,
@@ -723,15 +713,13 @@ public class SplitTransaction {
/*
* Get the daughter directories in the splits dir. The splits dir is under
* the parent regions' directory.
- * @param fs
* @param splitdir
* @param hri
* @return Path to daughter split dir.
* @throws IOException
*/
- private static Path getSplitDirForDaughter(final FileSystem fs,
- final Path splitdir, final HRegionInfo hri)
- throws IOException {
+ private static Path getSplitDirForDaughter(final Path splitdir, final HRegionInfo hri)
+ throws IOException {
return new Path(splitdir, hri.getEncodedName());
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Feb 14 13:35:54 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.NavigableSet;
@@ -41,7 +42,7 @@ import com.google.common.collect.Immutab
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public interface Store extends HeapSize {
+public interface Store extends HeapSize, StoreConfigInformation {
/* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
@@ -52,7 +53,7 @@ public interface Store extends HeapSize
// General Accessors
public KeyValue.KVComparator getComparator();
- public List<StoreFile> getStorefiles();
+ public Collection<StoreFile> getStorefiles();
/**
* Close all the readers We don't need to worry about subsequent requests because the HRegion
@@ -60,7 +61,7 @@ public interface Store extends HeapSize
* @return the {@link StoreFile StoreFiles} that were previously being used.
* @throws IOException on failure
*/
- public ImmutableList<StoreFile> close() throws IOException;
+ public Collection<StoreFile> close() throws IOException;
/**
* Return a scanner for both the memstore and the HStore files. Assumes we are not in a
@@ -208,11 +209,6 @@ public interface Store extends HeapSize
*/
public HFileDataBlockEncoder getDataBlockEncoder();
- /**
- * @return the number of files in this store
- */
- public int getNumberOfStoreFiles();
-
/** @return aggregate size of all HStores used in the last compaction */
public long getLastCompactSize();
@@ -257,13 +253,6 @@ public interface Store extends HeapSize
// Test-helper methods
/**
- * Compact the most recent N files. Used in testing.
- * @param N number of files to compact. Must be less than or equal to current number of files.
- * @throws IOException on failure
- */
- public void compactRecentForTesting(int N) throws IOException;
-
- /**
* Used for tests.
* @return cache configuration for this Store.
*/
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Thu Feb 14 13:35:54 2013
@@ -952,6 +952,38 @@ public class StoreFile {
}
/**
+ * Gets the approximate mid-point of this file that is optimal for use in splitting it.
+ * @param comparator Comparator used to compare KVs.
+ * @return The split point row, or null if splitting is not possible, or reader is null.
+ */
+ byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
+ if (this.reader == null) {
+ LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
+ return null;
+ }
+ // Get first, last, and mid keys. Midkey is the key that starts block
+ // in middle of hfile. Has column and timestamp. Need to return just
+ // the row we want to split on as midkey.
+ byte [] midkey = this.reader.midkey();
+ if (midkey != null) {
+ KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
+ byte [] fk = this.reader.getFirstKey();
+ KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
+ byte [] lk = this.reader.getLastKey();
+ KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
+ // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
+ if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("cannot split because midkey is the same as first or last row");
+ }
+ return null;
+ }
+ return mk.getRow();
+ }
+ return null;
+ }
+
+ /**
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
* local because it is an implementation detail of the HBase regionserver.
*/
@@ -1770,18 +1802,19 @@ public class StoreFile {
/**
* Useful comparators for comparing StoreFiles.
*/
- abstract static class Comparators {
+ public abstract static class Comparators {
/**
* Comparator that compares based on the Sequence Ids of the
* the StoreFiles. Bulk loads that did not request a seq ID
* are given a seq id of -1; thus, they are placed before all non-
* bulk loads, and bulk loads with sequence Id. Among these files,
- * the bulkLoadTime is used to determine the ordering.
+ * the size is used to determine the ordering, then bulkLoadTime.
* If there are ties, the path name is used as a tie-breaker.
*/
- static final Comparator<StoreFile> SEQ_ID =
+ public static final Comparator<StoreFile> SEQ_ID =
Ordering.compound(ImmutableList.of(
Ordering.natural().onResultOf(new GetSeqId()),
+ Ordering.natural().onResultOf(new GetFileSize()).reverse(),
Ordering.natural().onResultOf(new GetBulkTime()),
Ordering.natural().onResultOf(new GetPathName())
));
@@ -1793,6 +1826,13 @@ public class StoreFile {
}
}
+ private static class GetFileSize implements Function<StoreFile, Long> {
+ @Override
+ public Long apply(StoreFile sf) {
+ return sf.getReader().length();
+ }
+ }
+
private static class GetBulkTime implements Function<StoreFile, Long> {
@Override
public Long apply(StoreFile sf) {
@@ -1807,19 +1847,5 @@ public class StoreFile {
return sf.getPath().getName();
}
}
-
- /**
- * FILE_SIZE = descending sort StoreFiles (largest --> smallest in size)
- */
- static final Comparator<StoreFile> FILE_SIZE = Ordering.natural().reverse()
- .onResultOf(new Function<StoreFile, Long>() {
- @Override
- public Long apply(StoreFile sf) {
- if (sf == null) {
- throw new IllegalArgumentException("StorFile can not be null");
- }
- return sf.getReader().length();
- }
- });
}
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Feb 14 13:35:54 2013
@@ -206,7 +206,7 @@ public class StoreScanner extends NonLaz
protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
final boolean isCompaction = false;
return selectScannersFrom(store.getScanners(cacheBlocks, isGet,
- isCompaction, matcher));
+ isCompaction, matcher, scan.getStartRow(), scan.getStopRow()));
}
/**
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java Thu Feb 14 13:35:54 2013
@@ -29,18 +29,19 @@ public class StoreUtils {
/**
* Creates a deterministic hash code for store file collection.
*/
- public static Integer getDeterministicRandomSeed(final List<StoreFile> files) {
+ public static Integer getDeterministicRandomSeed(final Collection<StoreFile> files) {
if (files != null && !files.isEmpty()) {
- return files.get(0).getPath().getName().hashCode();
+ return files.iterator().next().getPath().getName().hashCode();
}
return null;
}
/**
* Determines whether any files in the collection are references.
+ * @param files The files.
*/
public static boolean hasReferences(final Collection<StoreFile> files) {
- if (files != null && files.size() > 0) {
+ if (files != null) {
for (StoreFile hsf: files) {
if (hsf.isReference()) {
return true;
@@ -53,7 +54,7 @@ public class StoreUtils {
/**
* Gets lowest timestamp from candidate StoreFiles
*/
- public static long getLowestTimestamp(final List<StoreFile> candidates)
+ public static long getLowestTimestamp(final Collection<StoreFile> candidates)
throws IOException {
long minTs = Long.MAX_VALUE;
for (StoreFile storeFile : candidates) {
@@ -61,4 +62,24 @@ public class StoreUtils {
}
return minTs;
}
+
+ /**
+ * Gets the largest file (with reader) out of the list of files.
+ * @param candidates The files to choose from.
+ * @return The largest file; null if no file has a reader.
+ */
+ static StoreFile getLargestFile(final Collection<StoreFile> candidates) {
+ long maxSize = -1L;
+ StoreFile largestSf = null;
+ for (StoreFile sf : candidates) {
+ StoreFile.Reader r = sf.getReader();
+ if (r == null) continue;
+ long size = r.length();
+ if (size > maxSize) {
+ maxSize = size;
+ largestSf = sf;
+ }
+ }
+ return largestSf;
+ }
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java Thu Feb 14 13:35:54 2013
@@ -58,43 +58,6 @@ public class CompactSelection {
}
/**
- * Select the expired store files to compact
- *
- * @param maxExpiredTimeStamp
- * The store file will be marked as expired if its max time stamp is
- * less than this maxExpiredTimeStamp.
- * @return A CompactSelection contains the expired store files as
- * filesToCompact
- */
- public CompactSelection selectExpiredStoreFilesToCompact(
- long maxExpiredTimeStamp) {
- if (filesToCompact == null || filesToCompact.size() == 0)
- return null;
- ArrayList<StoreFile> expiredStoreFiles = null;
- boolean hasExpiredStoreFiles = false;
- CompactSelection expiredSFSelection = null;
-
- for (StoreFile storeFile : this.filesToCompact) {
- if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
- LOG.info("Deleting the expired store file by compaction: "
- + storeFile.getPath() + " whose maxTimeStamp is "
- + storeFile.getReader().getMaxTimestamp()
- + " while the max expired timestamp is " + maxExpiredTimeStamp);
- if (!hasExpiredStoreFiles) {
- expiredStoreFiles = new ArrayList<StoreFile>();
- hasExpiredStoreFiles = true;
- }
- expiredStoreFiles.add(storeFile);
- }
- }
-
- if (hasExpiredStoreFiles) {
- expiredSFSelection = new CompactSelection(expiredStoreFiles);
- }
- return expiredSFSelection;
- }
-
- /**
* The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction.
*/
@@ -163,10 +126,6 @@ public class CompactSelection {
return selectionTime;
}
- public CompactSelection subList(int start, int end) {
- throw new UnsupportedOperationException();
- }
-
public CompactSelection getSubList(int start, int end) {
filesToCompact = filesToCompact.subList(start, end);
return this;
@@ -175,8 +134,4 @@ public class CompactSelection {
public void clearSubList(int start, int end) {
filesToCompact.subList(start, end).clear();
}
-
- private boolean isValidHour(int hour) {
- return (hour >= 0 && hour <= 23);
- }
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java Thu Feb 14 13:35:54 2013
@@ -24,7 +24,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
/**
* Compaction configuration for a particular instance of HStore.
@@ -49,7 +50,7 @@ public class CompactionConfiguration {
private static final String CONFIG_PREFIX = "hbase.hstore.compaction.";
Configuration conf;
- StoreConfiguration storeConfig;
+ StoreConfigInformation storeConfigInfo;
long maxCompactSize;
long minCompactSize;
@@ -63,14 +64,15 @@ public class CompactionConfiguration {
boolean shouldDeleteExpired;
long majorCompactionPeriod;
float majorCompactionJitter;
+ int blockingStoreFileCount;
- CompactionConfiguration(Configuration conf, StoreConfiguration storeConfig) {
+ CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
this.conf = conf;
- this.storeConfig = storeConfig;
+ this.storeConfigInfo = storeConfigInfo;
maxCompactSize = conf.getLong(CONFIG_PREFIX + "max.size", Long.MAX_VALUE);
minCompactSize = conf.getLong(CONFIG_PREFIX + "min.size",
- storeConfig.getMemstoreFlushSize());
+ storeConfigInfo.getMemstoreFlushSize());
minFilesToCompact = Math.max(2, conf.getInt(CONFIG_PREFIX + "min",
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
@@ -89,10 +91,12 @@ public class CompactionConfiguration {
}
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
- 2 * maxFilesToCompact * storeConfig.getMemstoreFlushSize());
+ 2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
+ blockingStoreFileCount =
+ conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
LOG.info("Compaction configuration " + this.toString());
}
@@ -117,6 +121,13 @@ public class CompactionConfiguration {
}
/**
+ * @return store file count that will cause the memstore of this store to be blocked.
+ */
+ int getBlockingStorefileCount() {
+ return this.blockingStoreFileCount;
+ }
+
+ /**
* @return lower bound below which compaction is selected without ratio test
*/
long getMinCompactSize() {
@@ -184,12 +195,6 @@ public class CompactionConfiguration {
* Major compactions are selected periodically according to this parameter plus jitter
*/
long getMajorCompactionPeriod() {
- if (storeConfig != null) {
- Long storeSpecificPeriod = storeConfig.getMajorCompactionPeriod();
- if (storeSpecificPeriod != null) {
- return storeSpecificPeriod;
- }
- }
return majorCompactionPeriod;
}
@@ -211,4 +216,4 @@ public class CompactionConfiguration {
private static boolean isValidHour(int hour) {
return (hour >= 0 && hour <= 23);
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Thu Feb 14 13:35:54 2013
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -53,6 +54,16 @@ public abstract class CompactionPolicy e
HStore store;
/**
+ * This is called before coprocessor preCompactSelection and should filter the candidates
+ * for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time.
+ * @param candidateFiles candidate files, ordered from oldest to newest
+ * @param filesCompacting files currently compacting
+ * @return the list of files that can theoretically be compacted.
+ */
+ public abstract List<StoreFile> preSelectCompaction(
+ List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting);
+
+ /**
* @param candidateFiles candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
@@ -62,11 +73,21 @@ public abstract class CompactionPolicy e
final boolean forceMajor) throws IOException;
/**
+ * @param storeFiles Store files in the store.
+ * @return The system compaction priority of the store, based on storeFiles.
+ * The priority range is as such - the smaller values are higher priority;
+ * 1 is user priority; only very important, blocking compactions should use
+ * values lower than that. With default settings, depending on the number of
+ * store files, the non-blocking priority will be in 2-6 range.
+ */
+ public abstract int getSystemCompactionPriority(final Collection<StoreFile> storeFiles);
+
+ /**
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
public abstract boolean isMajorCompaction(
- final List<StoreFile> filesToCompact) throws IOException;
+ final Collection<StoreFile> filesToCompact) throws IOException;
/**
* @param compactionSize Total size of some compaction
@@ -75,10 +96,12 @@ public abstract class CompactionPolicy e
public abstract boolean throttleCompaction(long compactionSize);
/**
- * @param numCandidates Number of candidate store files
+ * @param storeFiles Current store files.
+ * @param filesCompacting files currently compacting.
* @return whether a compactionSelection is possible
*/
- public abstract boolean needsCompaction(int numCandidates);
+ public abstract boolean needsCompaction(final Collection<StoreFile> storeFiles,
+ final List<StoreFile> filesCompacting);
/**
* Inform the policy that some configuration has been change,
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java Thu Feb 14 13:35:54 2013
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Collection;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Random;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
@@ -52,6 +54,26 @@ public class DefaultCompactionPolicy ext
compactor = new DefaultCompactor(this);
}
+ @Override
+ public List<StoreFile> preSelectCompaction(
+ List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
+ // candidates = all storefiles not already in compaction queue
+ if (!filesCompacting.isEmpty()) {
+ // exclude all files older than the newest file we're currently
+ // compacting. this allows us to preserve contiguity (HBASE-2856)
+ StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+ int idx = candidateFiles.indexOf(last);
+ Preconditions.checkArgument(idx != -1);
+ candidateFiles.subList(0, idx + 1).clear();
+ }
+ return candidateFiles;
+ }
+
+ @Override
+ public int getSystemCompactionPriority(final Collection<StoreFile> storeFiles) {
+ return this.comConf.getBlockingStorefileCount() - storeFiles.size();
+ }
+
/**
* @param candidateFiles candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
@@ -293,7 +315,7 @@ public class DefaultCompactionPolicy ext
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
- public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
+ public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
throws IOException {
boolean result = false;
long mcTime = getNextMajorCompactTime(filesToCompact);
@@ -308,7 +330,7 @@ public class DefaultCompactionPolicy ext
long cfTtl = this.store.getStoreFileTtl();
if (filesToCompact.size() == 1) {
// Single file
- StoreFile sf = filesToCompact.get(0);
+ StoreFile sf = filesToCompact.iterator().next();
Long minTimestamp = sf.getMinimumTimestamp();
long oldest = (minTimestamp == null)
? Long.MIN_VALUE
@@ -337,7 +359,7 @@ public class DefaultCompactionPolicy ext
return result;
}
- public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
+ public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
// default = 24hrs
long ret = comConf.getMajorCompactionPeriod();
if (ret > 0) {
@@ -366,11 +388,10 @@ public class DefaultCompactionPolicy ext
return compactionSize > comConf.getThrottlePoint();
}
- /**
- * @param numCandidates Number of candidate store files
- * @return whether a compactionSelection is possible
- */
- public boolean needsCompaction(int numCandidates) {
+ @Override
+ public boolean needsCompaction(final Collection<StoreFile> storeFiles,
+ final List<StoreFile> filesCompacting) {
+ int numCandidates = storeFiles.size() - filesCompacting.size();
return numCandidates > comConf.getMinFilesToCompact();
}
@@ -390,4 +411,4 @@ public class DefaultCompactionPolicy ext
}
return (currentHour >= startHour || currentHour < endHour);
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java Thu Feb 14 13:35:54 2013
@@ -87,6 +87,7 @@ public class OpenRegionHandler extends E
@Override
public void process() throws IOException {
boolean openSuccessful = false;
+ boolean transitionToFailedOpen = false;
final String regionName = regionInfo.getRegionNameAsString();
try {
@@ -130,6 +131,7 @@ public class OpenRegionHandler extends E
HRegion region = openRegion();
if (region == null) {
tryTransitionFromOpeningToFailedOpen(regionInfo);
+ transitionToFailedOpen = true;
return;
}
boolean failed = true;
@@ -142,6 +144,7 @@ public class OpenRegionHandler extends E
this.rsServices.isStopping()) {
cleanupFailedOpen(region);
tryTransitionFromOpeningToFailedOpen(regionInfo);
+ transitionToFailedOpen = true;
return;
}
@@ -154,6 +157,7 @@ public class OpenRegionHandler extends E
// In all cases, we try to transition to failed_open to be safe.
cleanupFailedOpen(region);
tryTransitionFromOpeningToFailedOpen(regionInfo);
+ transitionToFailedOpen = true;
return;
}
@@ -197,6 +201,8 @@ public class OpenRegionHandler extends E
" should be closed is now opened."
);
}
+ } else if (transitionToFailedOpen == false) {
+ tryTransitionFromOpeningToFailedOpen(regionInfo);
}
}
}
@@ -455,7 +461,7 @@ public class OpenRegionHandler extends E
return region;
}
- private void cleanupFailedOpen(final HRegion region) throws IOException {
+ void cleanupFailedOpen(final HRegion region) throws IOException {
if (region != null) region.close();
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Thu Feb 14 13:35:54 2013
@@ -827,6 +827,7 @@ class FSHLog implements HLog, Syncable {
}
if (this.writer != null) {
this.writer.close();
+ this.writer = null;
}
}
}
@@ -1081,35 +1082,43 @@ class FSHLog implements HLog, Syncable {
// issue the sync to HDFS. If sync is successful, then update
// syncedTillHere to indicate that transactions till this
// number has been successfully synced.
+ IOException ioe = null;
+ List<Entry> pending = null;
synchronized (flushLock) {
if (txid <= this.syncedTillHere) {
return;
}
doneUpto = this.unflushedEntries.get();
- List<Entry> pending = logSyncerThread.getPendingWrites();
+ pending = logSyncerThread.getPendingWrites();
try {
logSyncerThread.hlogFlush(tempWriter, pending);
} catch(IOException io) {
- synchronized (this.updateLock) {
+ ioe = io;
+ LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
+ }
+ }
+ if (ioe != null && pending != null) {
+ synchronized (this.updateLock) {
+ synchronized (flushLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
tempWriter = this.writer;
logSyncerThread.hlogFlush(tempWriter, pending);
}
- }
+ }
}
// another thread might have sync'ed avoid double-sync'ing
if (txid <= this.syncedTillHere) {
return;
}
try {
- tempWriter.sync();
+ if (tempWriter != null) tempWriter.sync();
} catch(IOException ex) {
synchronized (this.updateLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
// TODO: we don't actually need to do it for concurrent close - what is the point
// of syncing new unrelated writer? Keep behavior for now.
tempWriter = this.writer;
- tempWriter.sync();
+ if (tempWriter != null) tempWriter.sync();
}
}
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Thu Feb 14 13:35:54 2013
@@ -83,13 +83,6 @@ import com.google.common.collect.Lists;
public class HLogSplitter {
private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
- /**
- * Name of file that holds recovered edits written by the wal log splitting
- * code, one per region
- */
- public static final String RECOVERED_EDITS = "recovered.edits";
-
-
static final Log LOG = LogFactory.getLog(HLogSplitter.class);
private boolean hasSplit = false;
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java Thu Feb 14 13:35:54 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.rest.filter.GzipFilter;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.net.DNS;
@@ -64,7 +65,7 @@ public class RESTServer implements Const
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("bin/hbase rest start", "", options,
"\nTo run the REST server as a daemon, execute " +
- "bin/hbase-daemon.sh start|stop rest [-p <port>] [-ro]\n", true);
+ "bin/hbase-daemon.sh start|stop rest [--infoport <port>] [-p <port>] [-ro]\n", true);
System.exit(exitCode);
}
@@ -84,6 +85,7 @@ public class RESTServer implements Const
options.addOption("p", "port", true, "Port to bind to [default: 8080]");
options.addOption("ro", "readonly", false, "Respond only to GET HTTP " +
"method requests [default: false]");
+ options.addOption(null, "infoport", true, "Port for web UI");
CommandLine commandLine = null;
try {
@@ -107,6 +109,14 @@ public class RESTServer implements Const
LOG.debug("readonly set to true");
}
+ // check for user-defined info server port setting, if so override the conf
+ if (commandLine != null && commandLine.hasOption("infoport")) {
+ String val = commandLine.getOptionValue("infoport");
+ servlet.getConfiguration()
+ .setInt("hbase.rest.info.port", Integer.valueOf(val));
+ LOG.debug("Web UI port set to " + val);
+ }
+
@SuppressWarnings("unchecked")
List<String> remainingArgs = commandLine != null ?
commandLine.getArgList() : new ArrayList<String>();
@@ -169,6 +179,16 @@ public class RESTServer implements Const
machineName);
}
+ // Put up info server.
+ int port = conf.getInt("hbase.rest.info.port", 8085);
+ if (port >= 0) {
+ conf.setLong("startcode", System.currentTimeMillis());
+ String a = conf.get("hbase.rest.info.bindAddress", "0.0.0.0");
+ InfoServer infoServer = new InfoServer("rest", a, port, false, conf);
+ infoServer.setAttribute("hbase.conf", conf);
+ infoServer.start();
+ }
+
// start server
server.start();
server.join();
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Thu Feb 14 13:35:54 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
+import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.util.Shell.ExitCodeException;
@@ -60,6 +61,8 @@ public class ThriftServer {
private Configuration conf;
ThriftServerRunner serverRunner;
+ private InfoServer infoServer;
+
//
// Main program and support routines
//
@@ -86,6 +89,16 @@ public class ThriftServer {
void doMain(final String[] args) throws Exception {
processOptions(args);
serverRunner = new ThriftServerRunner(conf);
+
+ // Put up info server.
+ int port = conf.getInt("hbase.thrift.info.port", 9095);
+ if (port >= 0) {
+ conf.setLong("startcode", System.currentTimeMillis());
+ String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
+ infoServer = new InfoServer("thrift", a, port, false, conf);
+ infoServer.setAttribute("hbase.conf", conf);
+ infoServer.start();
+ }
serverRunner.run();
}
@@ -101,6 +114,7 @@ public class ThriftServer {
options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
options.addOption("h", "help", false, "Print help information");
+ options.addOption(null, "infoport", true, "Port for web UI");
options.addOption("m", MIN_WORKERS_OPTION, true,
"The minimum number of worker threads for " +
@@ -147,6 +161,18 @@ public class ThriftServer {
printUsageAndExit(options, -1);
}
+ // check for user-defined info server port setting, if so override the conf
+ try {
+ if (cmd.hasOption("infoport")) {
+ String val = cmd.getOptionValue("infoport");
+ conf.setInt("hbase.thrift.info.port", Integer.valueOf(val));
+ LOG.debug("Web UI port set to " + val);
+ }
+ } catch (NumberFormatException e) {
+ LOG.error("Could not parse the value provided for the infoport option", e);
+ printUsageAndExit(options, -1);
+ }
+
// Make optional changes to the configuration based on command-line options
optionToConf(cmd, MIN_WORKERS_OPTION,
conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
@@ -171,6 +197,14 @@ public class ThriftServer {
}
public void stop() {
+ if (this.infoServer != null) {
+ LOG.info("Stopping infoServer");
+ try {
+ this.infoServer.stop();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
serverRunner.shutdown();
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java Thu Feb 14 13:35:54 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.thrift.Ca
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
+import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -66,6 +67,7 @@ import com.google.common.util.concurrent
* HbaseClient.thrift IDL file.
*/
@InterfaceAudience.Private
+@SuppressWarnings({ "rawtypes", "unchecked" })
public class ThriftServer {
private static final Log log = LogFactory.getLog(ThriftServer.class);
@@ -91,6 +93,7 @@ public class ThriftServer {
options.addOption("f", "framed", false, "Use framed transport");
options.addOption("c", "compact", false, "Use the compact protocol");
options.addOption("h", "help", false, "Print help information");
+ options.addOption(null, "infoport", true, "Port for web UI");
OptionGroup servers = new OptionGroup();
servers.addOption(
@@ -225,15 +228,51 @@ public class ThriftServer {
Configuration conf = HBaseConfiguration.create();
ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
+ String implType = "threadpool";
+ if (nonblocking) {
+ implType = "nonblocking";
+ } else if (hsha) {
+ implType = "hsha";
+ }
+
+ conf.set("hbase.regionserver.thrift.server.type", implType);
+ conf.setInt("hbase.regionserver.thrift.port", listenPort);
+
// Construct correct ProtocolFactory
- TProtocolFactory protocolFactory = getTProtocolFactory(cmd.hasOption("compact"));
+ boolean compact = cmd.hasOption("compact");
+ TProtocolFactory protocolFactory = getTProtocolFactory(compact);
THBaseService.Iface handler =
ThriftHBaseServiceHandler.newInstance(conf, metrics);
THBaseService.Processor processor = new THBaseService.Processor(handler);
+ conf.setBoolean("hbase.regionserver.thrift.compact", compact);
boolean framed = cmd.hasOption("framed") || nonblocking || hsha;
TTransportFactory transportFactory = getTTransportFactory(framed);
InetSocketAddress inetSocketAddress = bindToPort(cmd.getOptionValue("bind"), listenPort);
+ conf.setBoolean("hbase.regionserver.thrift.framed", framed);
+
+ // check for user-defined info server port setting, if so override the conf
+ try {
+ if (cmd.hasOption("infoport")) {
+ String val = cmd.getOptionValue("infoport");
+ conf.setInt("hbase.thrift.info.port", Integer.valueOf(val));
+ log.debug("Web UI port set to " + val);
+ }
+ } catch (NumberFormatException e) {
+ log.error("Could not parse the value provided for the infoport option", e);
+ printUsage();
+ System.exit(1);
+ }
+
+ // Put up info server.
+ int port = conf.getInt("hbase.thrift.info.port", 9095);
+ if (port >= 0) {
+ conf.setLong("startcode", System.currentTimeMillis());
+ String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
+ InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
+ infoServer.setAttribute("hbase.conf", conf);
+ infoServer.start();
+ }
if (nonblocking) {
server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);