You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/12/23 21:54:15 UTC
svn commit: r1425525 [3/7] - in /hbase/branches/0.94-test: ./ bin/ conf/
security/src/main/java/org/apache/hadoop/hbase/ipc/
security/src/main/java/org/apache/hadoop/hbase/security/access/
security/src/test/java/org/apache/hadoop/hbase/security/access/...
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Dec 23 20:54:12 2012
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.EOFException;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
@@ -55,12 +56,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -76,6 +79,7 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
@@ -228,15 +232,34 @@ public class HRegion implements HeapSize
* The directory for the table this region is part of.
* This directory contains the directory for this region.
*/
- final Path tableDir;
+ private final Path tableDir;
- final HLog log;
- final FileSystem fs;
- final Configuration conf;
- final int rowLockWaitDuration;
+ private final HLog log;
+ private final FileSystem fs;
+ private final Configuration conf;
+ private final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
- final HRegionInfo regionInfo;
- final Path regiondir;
+
+ // The internal wait duration to acquire a lock before read/update
+ // from the region. It is not per row. The purpose of this wait time
+ // is to avoid waiting a long time while the region is busy, so that
+ // we can release the IPC handler soon enough to improve the
+ // availability of the region server. It can be adjusted by
+ // tuning configuration "hbase.busy.wait.duration".
+ final long busyWaitDuration;
+ static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+
+ // If updating multiple rows in one call, wait longer,
+ // i.e. waiting for busyWaitDuration * # of rows. However,
+ // we can limit the max multiplier.
+ final int maxBusyWaitMultiplier;
+
+ // Max busy wait duration. There is no point to wait longer than the RPC
+ // purge timeout, when a RPC call will be terminated by the RPC engine.
+ final long maxBusyWaitDuration;
+
+ private final HRegionInfo regionInfo;
+ private final Path regiondir;
KeyValue.KVComparator comparator;
private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
@@ -354,6 +377,10 @@ public class HRegion implements HeapSize
this.coprocessorHost = null;
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
this.opMetrics = new OperationMetrics();
+
+ this.maxBusyWaitDuration = 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+ this.busyWaitDuration = DEFAULT_BUSY_WAIT_DURATION;
+ this.maxBusyWaitMultiplier = 2;
}
/**
@@ -400,6 +427,17 @@ public class HRegion implements HeapSize
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
this.opMetrics = new OperationMetrics(conf, this.regionInfo);
+ this.busyWaitDuration = conf.getLong(
+ "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
+ this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
+ if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
+ throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
+ + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
+ + maxBusyWaitMultiplier + "). Their product should be positive");
+ }
+ this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
+ 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+
/*
* timestamp.slop provides a server-side constraint on the timestamp. This
* assumes that you base your TS around currentTimeMillis(). In this case,
@@ -688,7 +726,7 @@ public class HRegion implements HeapSize
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
if (this.rsAccounting != null) {
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
- }
+ }
return this.memstoreSize.getAndAdd(memStoreSize);
}
@@ -714,7 +752,7 @@ public class HRegion implements HeapSize
// and then create the file
Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
-
+
// if datanode crashes or if the RS goes down just before the close is called while trying to
// close the created regioninfo file in the .tmp directory then on next
// creation we will be getting AlreadyCreatedException.
@@ -722,7 +760,7 @@ public class HRegion implements HeapSize
if (FSUtils.isExists(fs, tmpPath)) {
FSUtils.delete(fs, tmpPath, true);
}
-
+
FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms);
try {
@@ -739,6 +777,26 @@ public class HRegion implements HeapSize
}
}
+ /**
+ * @param fs
+ * @param dir
+ * @return An HRegionInfo instance gotten from the <code>.regioninfo</code> file under region dir
+ * @throws IOException
+ */
+ public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir)
+ throws IOException {
+ Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE);
+ if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString());
+ FSDataInputStream in = fs.open(regioninfo);
+ try {
+ HRegionInfo hri = new HRegionInfo();
+ hri.readFields(in);
+ return hri;
+ } finally {
+ in.close();
+ }
+ }
+
/** @return a HRegionInfo object for this region */
public HRegionInfo getRegionInfo() {
return this.regionInfo;
@@ -883,6 +941,7 @@ public class HRegion implements HeapSize
this.closing.set(true);
status.setStatus("Disabling writes for close");
+ // block waiting for the lock for closing
lock.writeLock().lock();
try {
if (this.isClosed()) {
@@ -984,19 +1043,16 @@ public class HRegion implements HeapSize
return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
}
- private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+ static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
final String threadNamePrefix) {
- ThreadPoolExecutor openAndCloseThreadPool = Threads
- .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
- new ThreadFactory() {
- private int count = 1;
-
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, threadNamePrefix + "-" + count++);
- return t;
- }
- });
- return openAndCloseThreadPool;
+ return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+ new ThreadFactory() {
+ private int count = 1;
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r, threadNamePrefix + "-" + count++);
+ }
+ });
}
/**
@@ -1192,6 +1248,7 @@ public class HRegion implements HeapSize
return false;
}
Preconditions.checkArgument(cr.getHRegion().equals(this));
+ // block waiting for the lock for compaction
lock.readLock().lock();
MonitoredTask status = TaskMonitor.get().createStatus(
"Compacting " + cr.getStore() + " in " + this);
@@ -1271,6 +1328,7 @@ public class HRegion implements HeapSize
}
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
status.setStatus("Acquiring readlock on region");
+ // block waiting for the lock for flushing cache
lock.readLock().lock();
try {
if (this.closed.get()) {
@@ -1406,6 +1464,7 @@ public class HRegion implements HeapSize
// end up in both snapshot and memstore (makes it difficult to do atomic
// rows then)
status.setStatus("Obtaining lock to block concurrent updates");
+ // block waiting for the lock for internal flush
this.updatesLock.writeLock().lock();
long flushsize = this.memstoreSize.get();
status.setStatus("Preparing to flush by snapshotting stores");
@@ -1664,11 +1723,23 @@ public class HRegion implements HeapSize
//////////////////////////////////////////////////////////////////////////////
// set() methods for client use.
//////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * @param delete delete object
+ * @param writeToWAL append to the write ahead lock or not
+ * @throws IOException read exceptions
+ */
+ public void delete(Delete delete, boolean writeToWAL)
+ throws IOException {
+ delete(delete, null, writeToWAL);
+ }
+
/**
* @param delete delete object
* @param lockid existing lock id, or null for grab a lock
* @param writeToWAL append to the write ahead lock or not
* @throws IOException read exceptions
+ * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public void delete(Delete delete, Integer lockid, boolean writeToWAL)
throws IOException {
@@ -1784,7 +1855,7 @@ public class HRegion implements HeapSize
byte [] byteNow = Bytes.toBytes(now);
boolean flush = false;
- updatesLock.readLock().lock();
+ lock(updatesLock.readLock());
try {
prepareDeleteTimestamps(delete.getFamilyMap(), byteNow);
@@ -1843,6 +1914,7 @@ public class HRegion implements HeapSize
* @param put
* @param lockid
* @throws IOException
+ * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public void put(Put put, Integer lockid) throws IOException {
this.put(put, lockid, put.getWriteToWAL());
@@ -1855,6 +1927,7 @@ public class HRegion implements HeapSize
* @param lockid
* @param writeToWAL
* @throws IOException
+ * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public void put(Put put, Integer lockid, boolean writeToWAL)
throws IOException {
@@ -1940,7 +2013,7 @@ public class HRegion implements HeapSize
System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length);
return batchMutate(mutationsAndLocks);
}
-
+
/**
* Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed.
@@ -2163,7 +2236,7 @@ public class HRegion implements HeapSize
}
}
- this.updatesLock.readLock().lock();
+ lock(this.updatesLock.readLock(), numReadyToWrite);
locked = true;
//
@@ -2294,7 +2367,7 @@ public class HRegion implements HeapSize
// do after lock
final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs;
-
+
// See if the column families were consistent through the whole thing.
// if they were then keep them. If they were not then pass a null.
// null will be treated as unknown.
@@ -2328,6 +2401,24 @@ public class HRegion implements HeapSize
//the getting of the lock happens before, so that you would just pass it into
//the methods. So in the case of checkAndMutate you could just do lockRow,
//get, put, unlockRow or something
+ /**
+ *
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param compareOp
+ * @param comparator
+ * @param writeToWAL
+ * @throws IOException
+ * @return true if the new put was execute, false otherwise
+ */
+ public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
+ CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
+ boolean writeToWAL)
+ throws IOException {
+ return checkAndMutate(row, family, qualifier, compareOp, comparator, w, null, writeToWAL);
+ }
+
/**
*
* @param row
@@ -2339,6 +2430,7 @@ public class HRegion implements HeapSize
* @param writeToWAL
* @throws IOException
* @return true if the new put was execute, false otherwise
+ * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
@@ -2457,7 +2549,8 @@ public class HRegion implements HeapSize
* this and the synchronize on 'this' inside in internalFlushCache to send
* the notify.
*/
- private void checkResources() {
+ private void checkResources()
+ throws RegionTooBusyException, InterruptedIOException {
// If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().isMetaRegion()) return;
@@ -2475,12 +2568,30 @@ public class HRegion implements HeapSize
" is >= than blocking " +
StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
}
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long timeToWait = startTime + busyWaitDuration - now;
+ if (timeToWait <= 0L) {
+ final long totalTime = now - startTime;
+ this.updatesBlockedMs.add(totalTime);
+ LOG.info("Failed to unblock updates for region " + this + " '"
+ + Thread.currentThread().getName() + "' in " + totalTime
+ + "ms. The region is still busy.");
+ throw new RegionTooBusyException("region is flushing");
+ }
blocked = true;
synchronized(this) {
try {
- wait(threadWakeFrequency);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ wait(Math.min(timeToWait, threadWakeFrequency));
+ } catch (InterruptedException ie) {
+ final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ if (totalTime > 0) {
+ this.updatesBlockedMs.add(totalTime);
+ }
+ LOG.info("Interrupted while waiting to unblock updates for region "
+ + this + " '" + Thread.currentThread().getName() + "'");
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
}
}
}
@@ -2547,7 +2658,7 @@ public class HRegion implements HeapSize
byte[] byteNow = Bytes.toBytes(now);
boolean flush = false;
- this.updatesLock.readLock().lock();
+ lock(this.updatesLock.readLock());
try {
checkFamilies(familyMap.keySet());
checkTimestamps(familyMap, now);
@@ -2578,7 +2689,7 @@ public class HRegion implements HeapSize
// do after lock
final long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -3172,6 +3283,7 @@ public class HRegion implements HeapSize
* @param lockId The lock ID to release.
*/
public void releaseRowLock(final Integer lockId) {
+ if (lockId == null) return; // null lock id, do nothing
HashedBytes rowKey = lockIds.remove(lockId);
if (rowKey == null) {
LOG.warn("Release unknown lockId: " + lockId);
@@ -3412,6 +3524,10 @@ public class HRegion implements HeapSize
this(scan, null);
}
+ @Override
+ public long getMvccReadPoint() {
+ return this.readPt;
+ }
/**
* Reset both the filter and the old filter.
*/
@@ -3422,7 +3538,7 @@ public class HRegion implements HeapSize
}
@Override
- public synchronized boolean next(List<KeyValue> outResults, int limit)
+ public boolean next(List<KeyValue> outResults, int limit)
throws IOException {
return next(outResults, limit, null);
}
@@ -3442,30 +3558,42 @@ public class HRegion implements HeapSize
// This could be a new thread from the last time we called next().
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
- results.clear();
-
- boolean returnResult = nextInternal(limit, metric);
-
- outResults.addAll(results);
- resetFilters();
- if (isFilterDone()) {
- return false;
- }
- return returnResult;
+ return nextRaw(outResults, limit, metric);
} finally {
closeRegionOperation();
}
}
@Override
- public synchronized boolean next(List<KeyValue> outResults)
+ public boolean nextRaw(List<KeyValue> outResults, String metric)
+ throws IOException {
+ return nextRaw(outResults, batch, metric);
+ }
+
+ @Override
+ public boolean nextRaw(List<KeyValue> outResults, int limit,
+ String metric) throws IOException {
+ results.clear();
+
+ boolean returnResult = nextInternal(limit, metric);
+
+ outResults.addAll(results);
+ resetFilters();
+ if (isFilterDone()) {
+ return false;
+ }
+ return returnResult;
+ }
+
+ @Override
+ public boolean next(List<KeyValue> outResults)
throws IOException {
// apply the batching limit by default
return next(outResults, batch, null);
}
@Override
- public synchronized boolean next(List<KeyValue> outResults, String metric)
+ public boolean next(List<KeyValue> outResults, String metric)
throws IOException {
// apply the batching limit by default
return next(outResults, batch, metric);
@@ -3489,8 +3617,16 @@ public class HRegion implements HeapSize
rpcCall.throwExceptionIfCallerDisconnected();
}
- byte [] currentRow = peekRow();
- if (isStopRow(currentRow)) {
+ KeyValue current = this.storeHeap.peek();
+ byte[] currentRow = null;
+ int offset = 0;
+ short length = 0;
+ if (current != null) {
+ currentRow = current.getBuffer();
+ offset = current.getRowOffset();
+ length = current.getRowLength();
+ }
+ if (isStopRow(currentRow, offset, length)) {
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}
@@ -3499,10 +3635,10 @@ public class HRegion implements HeapSize
}
return false;
- } else if (filterRowKey(currentRow)) {
- nextRow(currentRow);
+ } else if (filterRowKey(currentRow, offset, length)) {
+ nextRow(currentRow, offset, length);
} else {
- byte [] nextRow;
+ KeyValue nextKv;
do {
this.storeHeap.next(results, limit - results.size(), metric);
if (limit > 0 && results.size() == limit) {
@@ -3512,9 +3648,10 @@ public class HRegion implements HeapSize
}
return true; // we are expecting more yes, but also limited to how many we can return.
}
- } while (Bytes.equals(currentRow, nextRow = peekRow()));
+ nextKv = this.storeHeap.peek();
+ } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
- final boolean stopRow = isStopRow(nextRow);
+ final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
// now that we have an entire row, lets process with a filters:
@@ -3529,7 +3666,7 @@ public class HRegion implements HeapSize
// the reasons for calling this method are:
// 1. reset the filters.
// 2. provide a hook to fast forward the row (used by subclasses)
- nextRow(currentRow);
+ nextRow(currentRow, offset, length);
// This row was totally filtered out, if this is NOT the last row,
// we should continue on.
@@ -3545,29 +3682,25 @@ public class HRegion implements HeapSize
return filter != null
&& filter.filterRow();
}
- private boolean filterRowKey(byte[] row) {
+ private boolean filterRowKey(byte[] row, int offset, short length) {
return filter != null
- && filter.filterRowKey(row, 0, row.length);
+ && filter.filterRowKey(row, offset, length);
}
- protected void nextRow(byte [] currentRow) throws IOException {
- while (Bytes.equals(currentRow, peekRow())) {
- this.storeHeap.next(MOCKED_LIST);
+ protected void nextRow(byte [] currentRow, int offset, short length) throws IOException {
+ KeyValue next;
+ while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
+ this.storeHeap.next(MOCKED_LIST);
}
results.clear();
resetFilters();
}
- private byte[] peekRow() {
- KeyValue kv = this.storeHeap.peek();
- return kv == null ? null : kv.getRow();
- }
-
- private boolean isStopRow(byte [] currentRow) {
+ private boolean isStopRow(byte [] currentRow, int offset, short length) {
return currentRow == null ||
(stopRow != null &&
comparator.compareRows(stopRow, 0, stopRow.length,
- currentRow, 0, currentRow.length) <= isScan);
+ currentRow, offset, length) <= isScan);
}
@Override
@@ -3695,6 +3828,7 @@ public class HRegion implements HeapSize
* @param conf
* @param hTableDescriptor
* @param hlog shared HLog
+ * @param boolean initialize - true to initialize the region
* @return new HRegion
*
* @throws IOException
@@ -3702,7 +3836,36 @@ public class HRegion implements HeapSize
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog)
+ final HLog hlog,
+ final boolean initialize)
+ throws IOException {
+ return createHRegion(info, rootDir, conf, hTableDescriptor,
+ hlog, initialize, false);
+ }
+
+ /**
+ * Convenience method creating new HRegions. Used by createTable.
+ * The {@link HLog} for the created region needs to be closed
+ * explicitly, if it is not null.
+ * Use {@link HRegion#getLog()} to get access.
+ *
+ * @param info Info for region to create.
+ * @param rootDir Root directory for HBase instance
+ * @param conf
+ * @param hTableDescriptor
+ * @param hlog shared HLog
+ * @param boolean initialize - true to initialize the region
+ * @param boolean ignoreHLog
+ - true to skip generate new hlog if it is null, mostly for createTable
+ * @return new HRegion
+ *
+ * @throws IOException
+ */
+ public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
+ final Configuration conf,
+ final HTableDescriptor hTableDescriptor,
+ final HLog hlog,
+ final boolean initialize, final boolean ignoreHLog)
throws IOException {
LOG.info("creating HRegion " + info.getTableNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
@@ -3714,16 +3877,26 @@ public class HRegion implements HeapSize
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
HLog effectiveHLog = hlog;
- if (hlog == null) {
+ if (hlog == null && !ignoreHLog) {
effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
}
HRegion region = HRegion.newHRegion(tableDir,
effectiveHLog, fs, conf, info, hTableDescriptor, null);
- region.initialize();
+ if (initialize) {
+ region.initialize();
+ }
return region;
}
+ public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
+ final Configuration conf,
+ final HTableDescriptor hTableDescriptor,
+ final HLog hlog)
+ throws IOException {
+ return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
+ }
+
/**
* Open a Region.
* @param info Info for region to be opened.
@@ -4178,9 +4351,19 @@ public class HRegion implements HeapSize
//
/**
* @param get get object
+ * @return result
+ * @throws IOException read exceptions
+ */
+ public Result get(final Get get) throws IOException {
+ return get(get, null);
+ }
+
+ /**
+ * @param get get object
* @param lockid existing lock id, or null for no previous lock
* @return result
* @throws IOException read exceptions
+ * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public Result get(final Get get, final Integer lockid) throws IOException {
checkRow(get.getRow(), "Get");
@@ -4235,7 +4418,7 @@ public class HRegion implements HeapSize
// do after lock
final long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateGetMetrics(get.familySet(), after - now);
-
+
return results;
}
@@ -4303,7 +4486,7 @@ public class HRegion implements HeapSize
}
// 3. acquire the region lock
- this.updatesLock.readLock().lock();
+ lock(this.updatesLock.readLock(), acquiredLocks.size());
locked = true;
// 4. Get a mvcc write number
@@ -4425,6 +4608,23 @@ public class HRegion implements HeapSize
// TODO: There's a lot of boiler plate code identical
// to increment... See how to better unify that.
+
+ /**
+ *
+ * Perform one or more append operations on a row.
+ * <p>
+ * Appends performed are done under row lock but reads do not take locks out
+ * so this can be seen partially complete by gets and scans.
+ *
+ * @param append
+ * @param writeToWAL
+ * @return new keyvalues after increment
+ * @throws IOException
+ */
+ public Result append(Append append, boolean writeToWAL)
+ throws IOException {
+ return append(append, null, writeToWAL);
+ }
/**
*
* Perform one or more append operations on a row.
@@ -4437,6 +4637,7 @@ public class HRegion implements HeapSize
* @param writeToWAL
* @return new keyvalues after increment
* @throws IOException
+ * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public Result append(Append append, Integer lockid, boolean writeToWAL)
throws IOException {
@@ -4456,7 +4657,7 @@ public class HRegion implements HeapSize
this.writeRequestsCount.increment();
try {
Integer lid = getLock(lockid, row, true);
- this.updatesLock.readLock().lock();
+ lock(this.updatesLock.readLock());
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
@@ -4563,10 +4764,10 @@ public class HRegion implements HeapSize
closeRegionOperation();
}
-
+
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -4576,6 +4777,22 @@ public class HRegion implements HeapSize
}
/**
+ *
+ * Perform one or more increment operations on a row.
+ * <p>
+ * Increments performed are done under row lock but reads do not take locks
+ * out so this can be seen partially complete by gets and scans.
+ * @param increment
+ * @param writeToWAL
+ * @return new keyvalues after increment
+ * @throws IOException
+ */
+ public Result increment(Increment increment, boolean writeToWAL)
+ throws IOException {
+ return increment(increment, null, writeToWAL);
+ }
+
+ /**
*
* Perform one or more increment operations on a row.
* <p>
@@ -4586,6 +4803,8 @@ public class HRegion implements HeapSize
* @param writeToWAL
* @return new keyvalues after increment
* @throws IOException
+ * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
+
*/
public Result increment(Increment increment, Integer lockid,
boolean writeToWAL)
@@ -4607,7 +4826,7 @@ public class HRegion implements HeapSize
this.writeRequestsCount.increment();
try {
Integer lid = getLock(lockid, row, true);
- this.updatesLock.readLock().lock();
+ lock(this.updatesLock.readLock());
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
@@ -4691,7 +4910,7 @@ public class HRegion implements HeapSize
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
}
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -4725,7 +4944,7 @@ public class HRegion implements HeapSize
this.writeRequestsCount.increment();
try {
Integer lid = obtainRowLock(row);
- this.updatesLock.readLock().lock();
+ lock(this.updatesLock.readLock());
try {
Store store = stores.get(family);
@@ -4818,8 +5037,8 @@ public class HRegion implements HeapSize
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 35 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
- (5 * Bytes.SIZEOF_LONG) +
+ 35 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ (7 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
@@ -5119,13 +5338,16 @@ public class HRegion implements HeapSize
* #closeRegionOperation needs to be called in the try's finally block
* Acquires a read lock and checks if the region is closing or closed.
* @throws NotServingRegionException when the region is closing or closed
+ * @throws RegionTooBusyException if failed to get the lock in time
+ * @throws InterruptedIOException if interrupted while waiting for a lock
*/
- private void startRegionOperation() throws NotServingRegionException {
+ public void startRegionOperation()
+ throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
if (this.closing.get()) {
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing");
}
- lock.readLock().lock();
+ lock(lock.readLock());
if (this.closed.get()) {
lock.readLock().unlock();
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
@@ -5137,7 +5359,7 @@ public class HRegion implements HeapSize
* Closes the lock. This needs to be called in the finally block corresponding
* to the try block of #startRegionOperation
*/
- private void closeRegionOperation(){
+ public void closeRegionOperation(){
lock.readLock().unlock();
}
@@ -5147,15 +5369,17 @@ public class HRegion implements HeapSize
* #closeBulkRegionOperation needs to be called in the try's finally block
* Acquires a writelock and checks if the region is closing or closed.
* @throws NotServingRegionException when the region is closing or closed
+ * @throws RegionTooBusyException if failed to get the lock in time
+ * @throws InterruptedIOException if interrupted while waiting for a lock
*/
private void startBulkRegionOperation(boolean writeLockNeeded)
- throws NotServingRegionException {
+ throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
if (this.closing.get()) {
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing");
}
- if (writeLockNeeded) lock.writeLock().lock();
- else lock.readLock().lock();
+ if (writeLockNeeded) lock(lock.writeLock());
+ else lock(lock.readLock());
if (this.closed.get()) {
if (writeLockNeeded) lock.writeLock().unlock();
else lock.readLock().unlock();
@@ -5168,7 +5392,7 @@ public class HRegion implements HeapSize
* Closes the lock. This needs to be called in the finally block corresponding
* to the try block of #startRegionOperation
*/
- private void closeBulkRegionOperation(){
+ private void closeBulkRegionOperation() {
if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
else lock.readLock().unlock();
}
@@ -5179,7 +5403,7 @@ public class HRegion implements HeapSize
*/
private void recordPutWithoutWal(final Map<byte [], List<KeyValue>> familyMap) {
if (numPutsWithoutWAL.getAndIncrement() == 0) {
- LOG.info("writing data to region " + this +
+ LOG.info("writing data to region " + this +
" with WAL disabled. Data may be lost in the event of a crash.");
}
@@ -5193,6 +5417,33 @@ public class HRegion implements HeapSize
dataInMemoryWithoutWAL.addAndGet(putSize);
}
+ private void lock(final Lock lock)
+ throws RegionTooBusyException, InterruptedIOException {
+ lock(lock, 1);
+ }
+
+ /**
+ * Try to acquire a lock. Throw RegionTooBusyException
+ * if failed to get the lock in time. Throw InterruptedIOException
+ * if interrupted while waiting for the lock.
+ */
+ private void lock(final Lock lock, final int multiplier)
+ throws RegionTooBusyException, InterruptedIOException {
+ try {
+ final long waitTime = Math.min(maxBusyWaitDuration,
+ busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
+ if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
+ throw new RegionTooBusyException(
+ "failed to get a lock in " + waitTime + "ms");
+ }
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted while waiting for a lock");
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
+ }
+ }
+
/**
* Calls sync with the given transaction ID if the region's table is not
* deferring it.
@@ -5232,7 +5483,6 @@ public class HRegion implements HeapSize
}
};
-
/**
* Facility for dumping and compacting catalog tables.
* Only does catalog tables since these are only tables we for sure know
@@ -5265,11 +5515,11 @@ public class HRegion implements HeapSize
final HLog log = new HLog(fs, logdir, oldLogDir, c);
try {
processTable(fs, tableDir, log, c, majorCompact);
- } finally {
+ } finally {
log.close();
// TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache();
if (bc != null) bc.shutdown();
- }
+ }
}
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Dec 23 20:54:12 2012
@@ -44,6 +44,7 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.client.Mu
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
@@ -165,6 +167,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.field.MillisDurationField;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
@@ -233,7 +236,7 @@ public class HRegionServer implements HR
// Server to handle client requests. Default access so can be accessed by
// unit tests.
RpcServer rpcServer;
-
+
// Server to handle client requests.
private HBaseServer server;
@@ -363,6 +366,8 @@ public class HRegionServer implements HR
*/
private ClusterId clusterId = null;
+ private RegionServerCoprocessorHost rsHost;
+
/**
* Starts a HRegionServer at the default location
*
@@ -434,6 +439,10 @@ public class HRegionServer implements HR
this.rpcServer.setQosFunction(new QosFunction());
this.startcode = System.currentTimeMillis();
+ // login the zookeeper client principal (if using security)
+ ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
+ "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
+
// login the server principal (if using secure Hadoop)
User.login(this.conf, "hbase.regionserver.keytab.file",
"hbase.regionserver.kerberos.principal", this.isa.getHostName());
@@ -1013,6 +1022,7 @@ public class HRegionServer implements HR
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
+ this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
startServiceThreads();
LOG.info("Serving as " + this.serverNameFromMasterPOV +
", RPC listening on " + this.isa +
@@ -1020,6 +1030,7 @@ public class HRegionServer implements HR
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
isOnline = true;
} catch (Throwable e) {
+ LOG.warn("Exception in region server : ", e);
this.isOnline = false;
stop("Failed initialization");
throw convertThrowableToIOE(cleanup(e, "Failed init"),
@@ -1095,8 +1106,7 @@ public class HRegionServer implements HR
storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
totalStaticIndexSizeKB, totalStaticBloomSizeKB,
(int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(),
- totalCompactingKVs, currentCompactedKVs,
- r.getCoprocessorHost().getCoprocessors());
+ totalCompactingKVs, currentCompactedKVs);
}
/**
@@ -1576,6 +1586,7 @@ public class HRegionServer implements HR
this.splitLogWorker = new SplitLogWorker(this.zooKeeper,
this.getConfiguration(), this.getServerName().toString());
splitLogWorker.start();
+
}
/**
@@ -1643,10 +1654,15 @@ public class HRegionServer implements HR
@Override
public void stop(final String msg) {
- this.stopped = true;
- LOG.info("STOPPED: " + msg);
- // Wakes run() if it is sleeping
- sleeper.skipSleepCycle();
+ try {
+ this.rsHost.preStop(msg);
+ this.stopped = true;
+ LOG.info("STOPPED: " + msg);
+ // Wakes run() if it is sleeping
+ sleeper.skipSleepCycle();
+ } catch (IOException exp) {
+ LOG.warn("The region server did not stop", exp);
+ }
}
public void waitForServerOnline(){
@@ -2430,23 +2446,32 @@ public class HRegionServer implements HR
}
}
- for (int i = 0; i < nbRows
- && currentScanResultSize < maxScannerResultSize; i++) {
- requestCount.incrementAndGet();
- // Collect values to be returned here
- boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE);
- if (!values.isEmpty()) {
- for (KeyValue kv : values) {
- currentScanResultSize += kv.heapSize();
+ MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
+ region.startRegionOperation();
+ try {
+ int i = 0;
+ synchronized(s) {
+ for (; i < nbRows
+ && currentScanResultSize < maxScannerResultSize; i++) {
+ // Collect values to be returned here
+ boolean moreRows = s.nextRaw(values, SchemaMetrics.METRIC_NEXTSIZE);
+ if (!values.isEmpty()) {
+ for (KeyValue kv : values) {
+ currentScanResultSize += kv.heapSize();
+ }
+ results.add(new Result(values));
+ }
+ if (!moreRows) {
+ break;
+ }
+ values.clear();
}
- results.add(new Result(values));
}
- if (!moreRows) {
- break;
- }
- values.clear();
+ requestCount.addAndGet(i);
+ region.readRequestsCount.add(i);
+ } finally {
+ region.closeRegionOperation();
}
-
// coprocessor postNext hook
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);
@@ -2589,6 +2614,9 @@ public class HRegionServer implements HR
return -1;
}
+ /**
+ * @deprecated {@link RowLock} and associated operations are deprecated.
+ */
public long lockRow(byte[] regionName, byte[] row) throws IOException {
checkOpen();
NullPointerException npe = null;
@@ -2605,6 +2633,9 @@ public class HRegionServer implements HR
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
+ if (region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().preLockRow(regionName, row);
+ }
Integer r = region.obtainRowLock(row);
long lockId = addRowLock(r, region);
LOG.debug("Row lock " + lockId + " explicitly acquired by client");
@@ -2648,6 +2679,9 @@ public class HRegionServer implements HR
return rl;
}
+ /**
+ * @deprecated {@link RowLock} and associated operations are deprecated.
+ */
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
public void unlockRow(byte[] regionName, long lockId) throws IOException {
@@ -2666,6 +2700,9 @@ public class HRegionServer implements HR
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
+ if (region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().preUnLockRow(regionName, lockId);
+ }
String lockName = String.valueOf(lockId);
Integer r = rowlocks.remove(lockName);
if (r == null) {
@@ -2842,6 +2879,11 @@ public class HRegionServer implements HR
final int versionOfClosingNode)
throws IOException {
checkOpen();
+ //Check for permissions to close.
+ HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
+ if (actualRegion.getCoprocessorHost() != null) {
+ actualRegion.getCoprocessorHost().preClose(false);
+ }
LOG.info("Received close region: " + region.getRegionNameAsString() +
". Version of ZK closing node:" + versionOfClosingNode);
boolean hasit = this.onlineRegions.containsKey(region.getEncodedName());
@@ -2889,6 +2931,17 @@ public class HRegionServer implements HR
*/
protected boolean closeRegion(HRegionInfo region, final boolean abort,
final boolean zk, final int versionOfClosingNode) {
+
+ HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
+ if ((actualRegion != null) && (actualRegion.getCoprocessorHost() !=null)){
+ try {
+ actualRegion.getCoprocessorHost().preClose(abort);
+ } catch (IOException e) {
+ LOG.warn(e);
+ return false;
+ }
+ }
+
if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
LOG.warn("Received close for region we are already opening or closing; " +
region.getEncodedName());
@@ -3589,6 +3642,10 @@ public class HRegionServer implements HR
return this.zooKeeper;
}
+ public RegionServerCoprocessorHost getCoprocessorHost(){
+ return this.rsHost;
+ }
+
public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
return this.regionsInTransitionInRS;
@@ -3766,8 +3823,13 @@ public class HRegionServer implements HR
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getCoprocessors() {
- HServerLoad hsl = buildServerLoad();
- return hsl == null? null: hsl.getCoprocessors();
+ TreeSet<String> coprocessors = new TreeSet<String>(
+ this.hlog.getCoprocessorHost().getCoprocessors());
+ Collection<HRegion> regions = getOnlineRegionsLocalContext();
+ for (HRegion region: regions) {
+ coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
+ }
+ return coprocessors.toArray(new String[0]);
}
/**
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Sun Dec 23 20:54:12 2012
@@ -37,7 +37,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -265,7 +264,7 @@ public class RegionCoprocessorHost
/**
* Invoked before a region open
*/
- public void preOpen() {
+ public void preOpen(){
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -285,7 +284,7 @@ public class RegionCoprocessorHost
/**
* Invoked after a region open
*/
- public void postOpen() {
+ public void postOpen(){
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -306,7 +305,7 @@ public class RegionCoprocessorHost
* Invoked before a region is closed
* @param abortRequested true if the server is aborting
*/
- public void preClose(boolean abortRequested) {
+ public void preClose(boolean abortRequested) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -314,7 +313,7 @@ public class RegionCoprocessorHost
try {
((RegionObserver)env.getInstance()).preClose(ctx, abortRequested);
} catch (Throwable e) {
- handleCoprocessorThrowableNoRethrow(env, e);
+ handleCoprocessorThrowable(env, e);
}
}
}
@@ -324,7 +323,7 @@ public class RegionCoprocessorHost
* Invoked after a region is closed
* @param abortRequested true if the server is aborting
*/
- public void postClose(boolean abortRequested) {
+ public void postClose(boolean abortRequested){
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -1483,5 +1482,31 @@ public class RegionCoprocessorHost
return hasLoaded;
}
+
+ public void preLockRow(byte[] regionName, byte[] row) throws IOException {
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ ((RegionObserver) env.getInstance()).preLockRow(ctx, regionName, row);
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ }
+
+ public void preUnLockRow(byte[] regionName, long lockId) throws IOException {
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ ((RegionObserver) env.getInstance()).preUnlockRow(ctx, regionName, lockId);
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ }
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Sun Dec 23 20:54:12 2012
@@ -20,7 +20,10 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
/**
* RegionScanner describes iterators over rows in an HRegion.
@@ -49,4 +52,50 @@ public interface RegionScanner extends I
*/
public boolean reseek(byte[] row) throws IOException;
+ /**
+ * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
+ */
+ public long getMvccReadPoint();
+
+ /**
+ * Grab the next row's worth of values with the default limit on the number of values
+ * to return.
+ * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+ * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+ * See {@link #nextRaw(List, int, String)}
+ * @param result return output array
+ * @param metric the metric name
+ * @return true if more rows exist after this one, false if scanner is done
+ * @throws IOException e
+ */
+ public boolean nextRaw(List<KeyValue> result, String metric) throws IOException;
+
+ /**
+ * Grab the next row's worth of values with a limit on the number of values
+ * to return.
+ * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+ * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+ * Example:
+ * <code><pre>
+ * HRegion region = ...;
+ * RegionScanner scanner = ...
+ * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+ * region.startRegionOperation();
+ * try {
+ * synchronized(scanner) {
+ * ...
+ * boolean moreRows = scanner.nextRaw(values);
+ * ...
+ * }
+ * } finally {
+ * region.closeRegionOperation();
+ * }
+ * </pre></code>
+ * @param result return output array
+ * @param limit limit on row count to get
+ * @param metric the metric name
+ * @return true if more rows exist after this one, false if scanner is done
+ * @throws IOException e
+ */
+ public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException;
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Sun Dec 23 20:54:12 2012
@@ -82,6 +82,8 @@ public class ScanQueryMatcher {
/* row is not private for tests */
/** Row the query is on */
byte [] row;
+ int rowOffset;
+ short rowLength;
/**
* Oldest put in any of the involved store files
@@ -222,7 +224,7 @@ public class ScanQueryMatcher {
short rowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT);
offset += Bytes.SIZEOF_SHORT;
- int ret = this.rowComparator.compareRows(row, 0, row.length,
+ int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
bytes, offset, rowLength);
if (ret <= -1) {
return MatchCode.DONE;
@@ -385,8 +387,10 @@ public class ScanQueryMatcher {
* Set current row
* @param row
*/
- public void setRow(byte [] row) {
+ public void setRow(byte [] row, int offset, short length) {
this.row = row;
+ this.rowOffset = offset;
+ this.rowLength = length;
reset();
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sun Dec 23 20:54:12 2012
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -132,9 +131,6 @@ public class Store extends SchemaConfigu
private volatile long totalUncompressedBytes = 0L;
private final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final String storeNameStr;
- private CompactionProgress progress;
- private final int compactionKVMax;
private final boolean verifyBulkLoads;
/* The default priority for user-specified compaction requests.
@@ -158,10 +154,6 @@ public class Store extends SchemaConfigu
new CopyOnWriteArraySet<ChangedReadersObserver>();
private final int blocksize;
- /** Compression algorithm for flush files and minor compaction */
- private final Compression.Algorithm compression;
- /** Compression algorithm for major compaction */
- private final Compression.Algorithm compactionCompression;
private HFileDataBlockEncoder dataBlockEncoder;
/** Checksum configuration */
@@ -171,6 +163,8 @@ public class Store extends SchemaConfigu
// Comparing KeyValues
final KeyValue.KVComparator comparator;
+ private final Compactor compactor;
+
/**
* Constructor
* @param basedir qualified path under which the region directory lives;
@@ -185,25 +179,16 @@ public class Store extends SchemaConfigu
protected Store(Path basedir, HRegion region, HColumnDescriptor family,
FileSystem fs, Configuration conf)
throws IOException {
- super(conf, region.getTableDesc().getNameAsString(),
+ super(conf, region.getRegionInfo().getTableNameAsString(),
Bytes.toString(family.getName()));
- HRegionInfo info = region.regionInfo;
+ HRegionInfo info = region.getRegionInfo();
this.fs = fs;
- this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
- if (!this.fs.exists(this.homedir)) {
- if (!this.fs.mkdirs(this.homedir))
- throw new IOException("Failed create of: " + this.homedir.toString());
- }
+ Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
+ this.homedir = createStoreHomeDir(this.fs, p);
this.region = region;
this.family = family;
this.conf = conf;
this.blocksize = family.getBlocksize();
- this.compression = family.getCompression();
- // avoid overriding compression setting for major compactions if the user
- // has not specified it separately
- this.compactionCompression =
- (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
- family.getCompactionCompression() : this.compression;
this.dataBlockEncoder =
new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
@@ -228,7 +213,6 @@ public class Store extends SchemaConfigu
"ms in store " + this);
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
- this.storeNameStr = getColumnFamilyName();
// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2,
@@ -245,10 +229,8 @@ public class Store extends SchemaConfigu
this.region.memstoreFlushSize);
this.maxCompactSize
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
- this.compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
- this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify",
- false);
+ this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
if (Store.closeCheckInterval == 0) {
Store.closeCheckInterval = conf.getInt(
@@ -260,6 +242,47 @@ public class Store extends SchemaConfigu
this.checksumType = getChecksumType(conf);
// initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf);
+ // Create a compaction tool instance
+ this.compactor = new Compactor(this.conf);
+ }
+
+ /**
+ * @param family
+ * @return
+ */
+ long getTTL(final HColumnDescriptor family) {
+ // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
+ long ttl = family.getTimeToLive();
+ if (ttl == HConstants.FOREVER) {
+ // Default is unlimited ttl.
+ ttl = Long.MAX_VALUE;
+ } else if (ttl == -1) {
+ ttl = Long.MAX_VALUE;
+ } else {
+ // Second -> ms adjust for user data
+ ttl *= 1000;
+ }
+ return ttl;
+ }
+
+ /**
+ * Create this store's homedir
+ * @param fs
+ * @param homedir
+ * @return Return <code>homedir</code>
+ * @throws IOException
+ */
+ Path createStoreHomeDir(final FileSystem fs,
+ final Path homedir) throws IOException {
+ if (!fs.exists(homedir)) {
+ if (!fs.mkdirs(homedir))
+ throw new IOException("Failed create of: " + homedir.toString());
+ }
+ return homedir;
+ }
+
+ FileSystem getFileSystem() {
+ return this.fs;
}
/**
@@ -320,7 +343,7 @@ public class Store extends SchemaConfigu
* Return the directory in which this store stores its
* StoreFiles
*/
- public Path getHomedir() {
+ Path getHomedir() {
return homedir;
}
@@ -339,6 +362,10 @@ public class Store extends SchemaConfigu
this.dataBlockEncoder = blockEncoder;
}
+ FileStatus [] getStoreFiles() throws IOException {
+ return FSUtils.listStatus(this.fs, this.homedir, null);
+ }
+
/**
* Creates an unsorted list of StoreFile loaded in parallel
* from the given directory.
@@ -346,7 +373,7 @@ public class Store extends SchemaConfigu
*/
private List<StoreFile> loadStoreFiles() throws IOException {
ArrayList<StoreFile> results = new ArrayList<StoreFile>();
- FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
+ FileStatus files[] = getStoreFiles();
if (files == null || files.length == 0) {
return results;
@@ -637,7 +664,7 @@ public class Store extends SchemaConfigu
storeFileCloserThreadPool.shutdownNow();
}
}
- LOG.debug("closed " + this.storeNameStr);
+ LOG.info("Closed " + this);
return result;
} finally {
this.lock.writeLock().unlock();
@@ -723,6 +750,7 @@ public class Store extends SchemaConfigu
scanner = cpScanner;
}
try {
+ int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
@@ -736,7 +764,7 @@ public class Store extends SchemaConfigu
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
do {
- hasMore = scanner.next(kvs, this.compactionKVMax);
+ hasMore = scanner.next(kvs, compactionKVMax);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
@@ -828,7 +856,7 @@ public class Store extends SchemaConfigu
*/
private StoreFile.Writer createWriterInTmp(int maxKeyCount)
throws IOException {
- return createWriterInTmp(maxKeyCount, this.compression, false);
+ return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
}
/*
@@ -981,16 +1009,12 @@ public class Store extends SchemaConfigu
* @param cr
* compaction details obtained from requestCompaction()
* @throws IOException
+ * @return Storefile we compacted into or null if we failed or opted out early.
*/
- void compact(CompactionRequest cr) throws IOException {
- if (cr == null || cr.getFiles().isEmpty()) {
- return;
- }
- Preconditions.checkArgument(cr.getStore().toString()
- .equals(this.toString()));
-
+ StoreFile compact(CompactionRequest cr) throws IOException {
+ if (cr == null || cr.getFiles().isEmpty()) return null;
+ Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
List<StoreFile> filesToCompact = cr.getFiles();
-
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
// TODO: change this to LOG.error() after more debugging
@@ -1002,19 +1026,26 @@ public class Store extends SchemaConfigu
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
- + this.storeNameStr + " of "
+ + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null;
try {
- StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
- maxId);
+ StoreFile.Writer writer =
+ this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
// Move the compaction into place.
- sf = completeCompaction(filesToCompact, writer);
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompact(this, sf);
+ if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
+ sf = completeCompaction(filesToCompact, writer);
+ if (region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postCompact(this, sf);
+ }
+ } else {
+ // Create storefile around what we wrote with a reader on it.
+ sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
+ this.family.getBloomFilterType(), this.dataBlockEncoder);
+ sf.createReader();
}
} finally {
synchronized (filesCompacting) {
@@ -1023,7 +1054,7 @@ public class Store extends SchemaConfigu
}
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
- + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ + filesToCompact.size() + " file(s) in " + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into " +
(sf == null ? "none" : sf.getPath().getName()) +
@@ -1031,6 +1062,7 @@ public class Store extends SchemaConfigu
StringUtils.humanReadableInt(sf.getReader().length()))
+ "; total size for store is "
+ StringUtils.humanReadableInt(storeSize));
+ return sf;
}
/**
@@ -1070,7 +1102,8 @@ public class Store extends SchemaConfigu
try {
// Ready to go. Have list of files to compact.
- StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
+ StoreFile.Writer writer =
+ this.compactor.compact(this, filesToCompact, isMajor, maxId);
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer);
if (region.getCoprocessorHost() != null) {
@@ -1119,10 +1152,10 @@ public class Store extends SchemaConfigu
}
/** getter for CompactionProgress object
- * @return CompactionProgress object
+ * @return CompactionProgress object; can be null
*/
public CompactionProgress getCompactionProgress() {
- return this.progress;
+ return this.compactor.getProgress();
}
/*
@@ -1174,19 +1207,19 @@ public class Store extends SchemaConfigu
if (sf.isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping major compaction of " + this.storeNameStr +
+ LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + this.ttl);
}
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
- LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+ LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+ LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
@@ -1376,12 +1409,12 @@ public class Store extends SchemaConfigu
compactSelection.getFilesToCompact().get(pos).getReader().length()
> maxCompactSize &&
!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
- compactSelection.clearSubList(0, pos);
+ if (pos != 0) compactSelection.clearSubList(0, pos);
}
if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
- this.storeNameStr + ": no store files to compact");
+ this + ": no store files to compact");
compactSelection.emptyFileList();
return compactSelection;
}
@@ -1468,7 +1501,7 @@ public class Store extends SchemaConfigu
// if we don't have enough files to compact, just wait
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipped compaction of " + this.storeNameStr
+ LOG.debug("Skipped compaction of " + this
+ ". Only " + (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria.");
@@ -1495,149 +1528,6 @@ public class Store extends SchemaConfigu
}
/**
- * Do a minor/major compaction on an explicit set of storefiles in a Store.
- * Uses the scan infrastructure to make it easy.
- *
- * @param filesToCompact which files to compact
- * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
- * @param maxId Readers maximum sequence id.
- * @return Product of compaction or null if all cells expired or deleted and
- * nothing made it through the compaction.
- * @throws IOException
- */
- StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
- final boolean majorCompaction, final long maxId)
- throws IOException {
- // calculate maximum key count after compaction (for blooms)
- int maxKeyCount = 0;
- long earliestPutTs = HConstants.LATEST_TIMESTAMP;
- for (StoreFile file : filesToCompact) {
- StoreFile.Reader r = file.getReader();
- if (r != null) {
- // NOTE: getFilterEntries could cause under-sized blooms if the user
- // switches bloom type (e.g. from ROW to ROWCOL)
- long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
- ? r.getFilterEntries() : r.getEntries();
- maxKeyCount += keyCount;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compacting " + file +
- ", keycount=" + keyCount +
- ", bloomtype=" + r.getBloomFilterType().toString() +
- ", size=" + StringUtils.humanReadableInt(r.length()) +
- ", encoding=" + r.getHFileReader().getEncodingOnDisk());
- }
- }
- // For major compactions calculate the earliest put timestamp
- // of all involved storefiles. This is used to remove
- // family delete marker during the compaction.
- if (majorCompaction) {
- byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
- if (tmp == null) {
- // there's a file with no information, must be an old one
- // assume we have very old puts
- earliestPutTs = HConstants.OLDEST_TIMESTAMP;
- } else {
- earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
- }
- }
- }
-
- // keep track of compaction progress
- progress = new CompactionProgress(maxKeyCount);
-
- // For each file, obtain a scanner:
- List<StoreFileScanner> scanners = StoreFileScanner
- .getScannersForStoreFiles(filesToCompact, false, false, true);
-
- // Make the instantiation lazy in case compaction produces no product; i.e.
- // where all source cells are expired or deleted.
- StoreFile.Writer writer = null;
- // Find the smallest read point across all the Scanners.
- long smallestReadPoint = region.getSmallestReadPoint();
- MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
- try {
- InternalScanner scanner = null;
- try {
- if (getHRegion().getCoprocessorHost() != null) {
- scanner = getHRegion()
- .getCoprocessorHost()
- .preCompactScannerOpen(this, scanners,
- majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
- }
- if (scanner == null) {
- Scan scan = new Scan();
- scan.setMaxVersions(getFamily().getMaxVersions());
- /* Include deletes, unless we are doing a major compaction */
- scanner = new StoreScanner(this, getScanInfo(), scan, scanners,
- majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
- smallestReadPoint, earliestPutTs);
- }
- if (getHRegion().getCoprocessorHost() != null) {
- InternalScanner cpScanner =
- getHRegion().getCoprocessorHost().preCompact(this, scanner);
- // NULL scanner returned from coprocessor hooks means skip normal processing
- if (cpScanner == null) {
- return null;
- }
- scanner = cpScanner;
- }
-
- int bytesWritten = 0;
- // since scanner.next() can return 'false' but still be delivering data,
- // we have to use a do/while loop.
- ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
- // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
- boolean hasMore;
- do {
- hasMore = scanner.next(kvs, this.compactionKVMax);
- if (writer == null && !kvs.isEmpty()) {
- writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
- true);
- }
- if (writer != null) {
- // output to writer:
- for (KeyValue kv : kvs) {
- if (kv.getMemstoreTS() <= smallestReadPoint) {
- kv.setMemstoreTS(0);
- }
- writer.append(kv);
- // update progress per key
- ++progress.currentCompactedKVs;
-
- // check periodically to see if a system stop is requested
- if (Store.closeCheckInterval > 0) {
- bytesWritten += kv.getLength();
- if (bytesWritten > Store.closeCheckInterval) {
- bytesWritten = 0;
- if (!this.region.areWritesEnabled()) {
- writer.close();
- fs.delete(writer.getPath(), false);
- throw new InterruptedIOException(
- "Aborting compaction of store " + this +
- " in region " + this.region +
- " because user requested stop.");
- }
- }
- }
- }
- }
- kvs.clear();
- } while (hasMore);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- } finally {
- if (writer != null) {
- writer.appendMetadata(maxId, majorCompaction);
- writer.close();
- }
- }
- return writer;
- }
-
- /**
* Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation.
*
@@ -1741,7 +1631,7 @@ public class Store extends SchemaConfigu
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
- LOG.error("Failed replacing compacted files in " + this.storeNameStr +
+ LOG.error("Failed replacing compacted files in " + this +
". Compacted file is " + (result == null? "none": result.toString()) +
". Files replaced " + compactedFiles.toString() +
" some of which may have been already removed", e);
@@ -2027,7 +1917,7 @@ public class Store extends SchemaConfigu
return mk.getRow();
}
} catch(IOException e) {
- LOG.warn("Failed getting store size for " + this.storeNameStr, e);
+ LOG.warn("Failed getting store size for " + this, e);
} finally {
this.lock.readLock().unlock();
}
@@ -2080,7 +1970,7 @@ public class Store extends SchemaConfigu
@Override
public String toString() {
- return this.storeNameStr;
+ return getColumnFamilyName();
}
/**
@@ -2196,7 +2086,7 @@ public class Store extends SchemaConfigu
}
HRegionInfo getHRegionInfo() {
- return this.region.regionInfo;
+ return this.region.getRegionInfo();
}
/**
@@ -2324,8 +2214,8 @@ public class Store extends SchemaConfigu
public static final long FIXED_OVERHEAD =
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
- + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
- + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+ + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sun Dec 23 20:54:12 2012
@@ -320,7 +320,7 @@ public class StoreFile extends SchemaCon
* @return Calculated path to parent region file.
* @throws IOException
*/
- static Path getReferredToFile(final Path p) {
+ public static Path getReferredToFile(final Path p) {
Matcher m = REF_NAME_PARSER.matcher(p.getName());
if (m == null || !m.matches()) {
LOG.warn("Failed match of store file name " + p.toString());
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sun Dec 23 20:54:12 2012
@@ -340,8 +340,11 @@ public class StoreScanner extends NonLaz
// only call setRow if the row changes; avoids confusing the query matcher
// if scanning intra-row
- if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
- matcher.setRow(peeked.getRow());
+ byte[] row = peeked.getBuffer();
+ int offset = peeked.getRowOffset();
+ short length = peeked.getRowLength();
+ if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
+ matcher.setRow(row, offset, length);
}
KeyValue kv;
@@ -521,9 +524,12 @@ public class StoreScanner extends NonLaz
if (kv == null) {
kv = lastTopKey;
}
- if ((matcher.row == null) || !kv.matchingRow(matcher.row)) {
+ byte[] row = kv.getBuffer();
+ int offset = kv.getRowOffset();
+ short length = kv.getRowLength();
+ if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
matcher.reset();
- matcher.setRow(kv.getRow());
+ matcher.setRow(row, offset, length);
}
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java Sun Dec 23 20:54:12 2012
@@ -49,5 +49,4 @@ public class CompactionProgress {
public float getProgressPct() {
return currentCompactedKVs / totalCompactingKVs;
}
-
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Sun Dec 23 20:54:12 2012
@@ -143,7 +143,12 @@ public class Compressor {
// the status byte also acts as the higher order byte of the dictionary
// entry
short dictIdx = toShort(status, in.readByte());
- byte[] entry = dict.getEntry(dictIdx);
+ byte[] entry;
+ try {
+ entry = dict.getEntry(dictIdx);
+ } catch (Exception ex) {
+ throw new IOException("Unable to uncompress the log entry", ex);
+ }
if (entry == null) {
throw new IOException("Missing dictionary entry for index "
+ dictIdx);
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Sun Dec 23 20:54:12 2012
@@ -167,6 +167,7 @@ public class HLog implements Syncable {
Entry next(Entry reuse) throws IOException;
void seek(long pos) throws IOException;
long getPosition() throws IOException;
+ void reset() throws IOException;
}
public interface Writer {
@@ -695,15 +696,18 @@ public class HLog implements Syncable {
/**
* Get a reader for the WAL.
+ * The proper way to tail a log that can be under construction is to first use this method
+ * to get a reader then call {@link HLog.Reader#reset()} to see the new data. It will also
+ * take care of keeping implementation-specific context (like compression).
* @param fs
* @param path
* @param conf
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
- public static Reader getReader(final FileSystem fs,
- final Path path, Configuration conf)
- throws IOException {
+ public static Reader getReader(final FileSystem fs, final Path path,
+ Configuration conf)
+ throws IOException {
try {
if (logReaderClass == null) {
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Sun Dec 23 20:54:12 2012
@@ -139,15 +139,17 @@ public class SequenceFileLogReader imple
Configuration conf;
WALReader reader;
+ FileSystem fs;
// Needed logging exceptions
Path path;
int edit = 0;
long entryStart = 0;
+ boolean emptyCompressionContext = true;
/**
* Compression context to use reading. Can be null if no compression.
*/
- private CompressionContext compressionContext = null;
+ protected CompressionContext compressionContext = null;
protected Class<? extends HLogKey> keyClass;
@@ -173,6 +175,7 @@ public class SequenceFileLogReader imple
this.conf = conf;
this.path = path;
reader = new WALReader(fs, path, conf);
+ this.fs = fs;
// If compression is enabled, new dictionaries are created here.
boolean compression = reader.isWALCompressionEnabled();
@@ -237,11 +240,22 @@ public class SequenceFileLogReader imple
throw addFileInfoToException(ioe);
}
edit++;
+ if (compressionContext != null && emptyCompressionContext) {
+ emptyCompressionContext = false;
+ }
return b? e: null;
}
@Override
public void seek(long pos) throws IOException {
+ if (compressionContext != null && emptyCompressionContext) {
+ while (next() != null) {
+ if (getPosition() == pos) {
+ emptyCompressionContext = false;
+ break;
+ }
+ }
+ }
try {
reader.seek(pos);
} catch (IOException ioe) {
@@ -286,4 +300,11 @@ public class SequenceFileLogReader imple
return ioe;
}
+
+ @Override
+ public void reset() throws IOException {
+ // Resetting the reader lets us see newly added data if the file is being written to
+ // We also keep the same compressionContext which was previously populated for this file
+ reader = new WALReader(fs, path, conf);
+ }
}
\ No newline at end of file