You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/12/23 20:34:56 UTC
svn commit: r1425513 [3/7] - in /hbase/branches/0.94-test: ./ bin/ conf/
security/src/main/java/org/apache/hadoop/hbase/ipc/
security/src/main/java/org/apache/hadoop/hbase/security/access/
security/src/test/java/org/apache/hadoop/hbase/security/access/...
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Dec 23 19:34:53 2012
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.EOFException;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
@@ -56,14 +55,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -79,7 +76,6 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
@@ -232,34 +228,15 @@ public class HRegion implements HeapSize
* The directory for the table this region is part of.
* This directory contains the directory for this region.
*/
- private final Path tableDir;
+ final Path tableDir;
- private final HLog log;
- private final FileSystem fs;
- private final Configuration conf;
- private final int rowLockWaitDuration;
+ final HLog log;
+ final FileSystem fs;
+ final Configuration conf;
+ final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
-
- // The internal wait duration to acquire a lock before read/update
- // from the region. It is not per row. The purpose of this wait time
- // is to avoid waiting a long time while the region is busy, so that
- // we can release the IPC handler soon enough to improve the
- // availability of the region server. It can be adjusted by
- // tuning configuration "hbase.busy.wait.duration".
- final long busyWaitDuration;
- static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
-
- // If updating multiple rows in one call, wait longer,
- // i.e. waiting for busyWaitDuration * # of rows. However,
- // we can limit the max multiplier.
- final int maxBusyWaitMultiplier;
-
- // Max busy wait duration. There is no point to wait longer than the RPC
- // purge timeout, when a RPC call will be terminated by the RPC engine.
- final long maxBusyWaitDuration;
-
- private final HRegionInfo regionInfo;
- private final Path regiondir;
+ final HRegionInfo regionInfo;
+ final Path regiondir;
KeyValue.KVComparator comparator;
private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
@@ -377,10 +354,6 @@ public class HRegion implements HeapSize
this.coprocessorHost = null;
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
this.opMetrics = new OperationMetrics();
-
- this.maxBusyWaitDuration = 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
- this.busyWaitDuration = DEFAULT_BUSY_WAIT_DURATION;
- this.maxBusyWaitMultiplier = 2;
}
/**
@@ -427,17 +400,6 @@ public class HRegion implements HeapSize
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
this.opMetrics = new OperationMetrics(conf, this.regionInfo);
- this.busyWaitDuration = conf.getLong(
- "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
- this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
- if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
- throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
- + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
- + maxBusyWaitMultiplier + "). Their product should be positive");
- }
- this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
- 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
/*
* timestamp.slop provides a server-side constraint on the timestamp. This
* assumes that you base your TS around currentTimeMillis(). In this case,
@@ -726,7 +688,7 @@ public class HRegion implements HeapSize
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
if (this.rsAccounting != null) {
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
- }
+ }
return this.memstoreSize.getAndAdd(memStoreSize);
}
@@ -752,7 +714,7 @@ public class HRegion implements HeapSize
// and then create the file
Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
-
+
// if datanode crashes or if the RS goes down just before the close is called while trying to
// close the created regioninfo file in the .tmp directory then on next
// creation we will be getting AlreadyCreatedException.
@@ -760,7 +722,7 @@ public class HRegion implements HeapSize
if (FSUtils.isExists(fs, tmpPath)) {
FSUtils.delete(fs, tmpPath, true);
}
-
+
FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms);
try {
@@ -777,26 +739,6 @@ public class HRegion implements HeapSize
}
}
- /**
- * @param fs
- * @param dir
- * @return An HRegionInfo instance gotten from the <code>.regioninfo</code> file under region dir
- * @throws IOException
- */
- public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir)
- throws IOException {
- Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE);
- if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString());
- FSDataInputStream in = fs.open(regioninfo);
- try {
- HRegionInfo hri = new HRegionInfo();
- hri.readFields(in);
- return hri;
- } finally {
- in.close();
- }
- }
-
/** @return a HRegionInfo object for this region */
public HRegionInfo getRegionInfo() {
return this.regionInfo;
@@ -941,7 +883,6 @@ public class HRegion implements HeapSize
this.closing.set(true);
status.setStatus("Disabling writes for close");
- // block waiting for the lock for closing
lock.writeLock().lock();
try {
if (this.isClosed()) {
@@ -1043,16 +984,19 @@ public class HRegion implements HeapSize
return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
}
- static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+ private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
final String threadNamePrefix) {
- return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
- new ThreadFactory() {
- private int count = 1;
-
- public Thread newThread(Runnable r) {
- return new Thread(r, threadNamePrefix + "-" + count++);
- }
- });
+ ThreadPoolExecutor openAndCloseThreadPool = Threads
+ .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+ new ThreadFactory() {
+ private int count = 1;
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+ return t;
+ }
+ });
+ return openAndCloseThreadPool;
}
/**
@@ -1248,7 +1192,6 @@ public class HRegion implements HeapSize
return false;
}
Preconditions.checkArgument(cr.getHRegion().equals(this));
- // block waiting for the lock for compaction
lock.readLock().lock();
MonitoredTask status = TaskMonitor.get().createStatus(
"Compacting " + cr.getStore() + " in " + this);
@@ -1328,7 +1271,6 @@ public class HRegion implements HeapSize
}
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
status.setStatus("Acquiring readlock on region");
- // block waiting for the lock for flushing cache
lock.readLock().lock();
try {
if (this.closed.get()) {
@@ -1464,7 +1406,6 @@ public class HRegion implements HeapSize
// end up in both snapshot and memstore (makes it difficult to do atomic
// rows then)
status.setStatus("Obtaining lock to block concurrent updates");
- // block waiting for the lock for internal flush
this.updatesLock.writeLock().lock();
long flushsize = this.memstoreSize.get();
status.setStatus("Preparing to flush by snapshotting stores");
@@ -1723,23 +1664,11 @@ public class HRegion implements HeapSize
//////////////////////////////////////////////////////////////////////////////
// set() methods for client use.
//////////////////////////////////////////////////////////////////////////////
-
- /**
- * @param delete delete object
- * @param writeToWAL append to the write ahead lock or not
- * @throws IOException read exceptions
- */
- public void delete(Delete delete, boolean writeToWAL)
- throws IOException {
- delete(delete, null, writeToWAL);
- }
-
/**
* @param delete delete object
* @param lockid existing lock id, or null for grab a lock
* @param writeToWAL append to the write ahead lock or not
* @throws IOException read exceptions
- * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public void delete(Delete delete, Integer lockid, boolean writeToWAL)
throws IOException {
@@ -1855,7 +1784,7 @@ public class HRegion implements HeapSize
byte [] byteNow = Bytes.toBytes(now);
boolean flush = false;
- lock(updatesLock.readLock());
+ updatesLock.readLock().lock();
try {
prepareDeleteTimestamps(delete.getFamilyMap(), byteNow);
@@ -1914,7 +1843,6 @@ public class HRegion implements HeapSize
* @param put
* @param lockid
* @throws IOException
- * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public void put(Put put, Integer lockid) throws IOException {
this.put(put, lockid, put.getWriteToWAL());
@@ -1927,7 +1855,6 @@ public class HRegion implements HeapSize
* @param lockid
* @param writeToWAL
* @throws IOException
- * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public void put(Put put, Integer lockid, boolean writeToWAL)
throws IOException {
@@ -2013,7 +1940,7 @@ public class HRegion implements HeapSize
System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length);
return batchMutate(mutationsAndLocks);
}
-
+
/**
* Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed.
@@ -2236,7 +2163,7 @@ public class HRegion implements HeapSize
}
}
- lock(this.updatesLock.readLock(), numReadyToWrite);
+ this.updatesLock.readLock().lock();
locked = true;
//
@@ -2367,7 +2294,7 @@ public class HRegion implements HeapSize
// do after lock
final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs;
-
+
// See if the column families were consistent through the whole thing.
// if they were then keep them. If they were not then pass a null.
// null will be treated as unknown.
@@ -2401,24 +2328,6 @@ public class HRegion implements HeapSize
//the getting of the lock happens before, so that you would just pass it into
//the methods. So in the case of checkAndMutate you could just do lockRow,
//get, put, unlockRow or something
- /**
- *
- * @param row
- * @param family
- * @param qualifier
- * @param compareOp
- * @param comparator
- * @param writeToWAL
- * @throws IOException
- * @return true if the new put was execute, false otherwise
- */
- public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
- CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
- boolean writeToWAL)
- throws IOException {
- return checkAndMutate(row, family, qualifier, compareOp, comparator, w, null, writeToWAL);
- }
-
/**
*
* @param row
@@ -2430,7 +2339,6 @@ public class HRegion implements HeapSize
* @param writeToWAL
* @throws IOException
* @return true if the new put was execute, false otherwise
- * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
@@ -2549,8 +2457,7 @@ public class HRegion implements HeapSize
* this and the synchronize on 'this' inside in internalFlushCache to send
* the notify.
*/
- private void checkResources()
- throws RegionTooBusyException, InterruptedIOException {
+ private void checkResources() {
// If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().isMetaRegion()) return;
@@ -2568,30 +2475,12 @@ public class HRegion implements HeapSize
" is >= than blocking " +
StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
}
- long now = EnvironmentEdgeManager.currentTimeMillis();
- long timeToWait = startTime + busyWaitDuration - now;
- if (timeToWait <= 0L) {
- final long totalTime = now - startTime;
- this.updatesBlockedMs.add(totalTime);
- LOG.info("Failed to unblock updates for region " + this + " '"
- + Thread.currentThread().getName() + "' in " + totalTime
- + "ms. The region is still busy.");
- throw new RegionTooBusyException("region is flushing");
- }
blocked = true;
synchronized(this) {
try {
- wait(Math.min(timeToWait, threadWakeFrequency));
- } catch (InterruptedException ie) {
- final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
- if (totalTime > 0) {
- this.updatesBlockedMs.add(totalTime);
- }
- LOG.info("Interrupted while waiting to unblock updates for region "
- + this + " '" + Thread.currentThread().getName() + "'");
- InterruptedIOException iie = new InterruptedIOException();
- iie.initCause(ie);
- throw iie;
+ wait(threadWakeFrequency);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
}
@@ -2658,7 +2547,7 @@ public class HRegion implements HeapSize
byte[] byteNow = Bytes.toBytes(now);
boolean flush = false;
- lock(this.updatesLock.readLock());
+ this.updatesLock.readLock().lock();
try {
checkFamilies(familyMap.keySet());
checkTimestamps(familyMap, now);
@@ -2689,7 +2578,7 @@ public class HRegion implements HeapSize
// do after lock
final long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -3283,7 +3172,6 @@ public class HRegion implements HeapSize
* @param lockId The lock ID to release.
*/
public void releaseRowLock(final Integer lockId) {
- if (lockId == null) return; // null lock id, do nothing
HashedBytes rowKey = lockIds.remove(lockId);
if (rowKey == null) {
LOG.warn("Release unknown lockId: " + lockId);
@@ -3524,10 +3412,6 @@ public class HRegion implements HeapSize
this(scan, null);
}
- @Override
- public long getMvccReadPoint() {
- return this.readPt;
- }
/**
* Reset both the filter and the old filter.
*/
@@ -3538,7 +3422,7 @@ public class HRegion implements HeapSize
}
@Override
- public boolean next(List<KeyValue> outResults, int limit)
+ public synchronized boolean next(List<KeyValue> outResults, int limit)
throws IOException {
return next(outResults, limit, null);
}
@@ -3558,42 +3442,30 @@ public class HRegion implements HeapSize
// This could be a new thread from the last time we called next().
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
- return nextRaw(outResults, limit, metric);
- } finally {
- closeRegionOperation();
- }
- }
+ results.clear();
- @Override
- public boolean nextRaw(List<KeyValue> outResults, String metric)
- throws IOException {
- return nextRaw(outResults, batch, metric);
- }
-
- @Override
- public boolean nextRaw(List<KeyValue> outResults, int limit,
- String metric) throws IOException {
- results.clear();
-
- boolean returnResult = nextInternal(limit, metric);
+ boolean returnResult = nextInternal(limit, metric);
- outResults.addAll(results);
- resetFilters();
- if (isFilterDone()) {
- return false;
+ outResults.addAll(results);
+ resetFilters();
+ if (isFilterDone()) {
+ return false;
+ }
+ return returnResult;
+ } finally {
+ closeRegionOperation();
}
- return returnResult;
}
@Override
- public boolean next(List<KeyValue> outResults)
+ public synchronized boolean next(List<KeyValue> outResults)
throws IOException {
// apply the batching limit by default
return next(outResults, batch, null);
}
@Override
- public boolean next(List<KeyValue> outResults, String metric)
+ public synchronized boolean next(List<KeyValue> outResults, String metric)
throws IOException {
// apply the batching limit by default
return next(outResults, batch, metric);
@@ -3617,16 +3489,8 @@ public class HRegion implements HeapSize
rpcCall.throwExceptionIfCallerDisconnected();
}
- KeyValue current = this.storeHeap.peek();
- byte[] currentRow = null;
- int offset = 0;
- short length = 0;
- if (current != null) {
- currentRow = current.getBuffer();
- offset = current.getRowOffset();
- length = current.getRowLength();
- }
- if (isStopRow(currentRow, offset, length)) {
+ byte [] currentRow = peekRow();
+ if (isStopRow(currentRow)) {
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}
@@ -3635,10 +3499,10 @@ public class HRegion implements HeapSize
}
return false;
- } else if (filterRowKey(currentRow, offset, length)) {
- nextRow(currentRow, offset, length);
+ } else if (filterRowKey(currentRow)) {
+ nextRow(currentRow);
} else {
- KeyValue nextKv;
+ byte [] nextRow;
do {
this.storeHeap.next(results, limit - results.size(), metric);
if (limit > 0 && results.size() == limit) {
@@ -3648,10 +3512,9 @@ public class HRegion implements HeapSize
}
return true; // we are expecting more yes, but also limited to how many we can return.
}
- nextKv = this.storeHeap.peek();
- } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+ } while (Bytes.equals(currentRow, nextRow = peekRow()));
- final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
+ final boolean stopRow = isStopRow(nextRow);
// now that we have an entire row, lets process with a filters:
@@ -3666,7 +3529,7 @@ public class HRegion implements HeapSize
// the reasons for calling this method are:
// 1. reset the filters.
// 2. provide a hook to fast forward the row (used by subclasses)
- nextRow(currentRow, offset, length);
+ nextRow(currentRow);
// This row was totally filtered out, if this is NOT the last row,
// we should continue on.
@@ -3682,25 +3545,29 @@ public class HRegion implements HeapSize
return filter != null
&& filter.filterRow();
}
- private boolean filterRowKey(byte[] row, int offset, short length) {
+ private boolean filterRowKey(byte[] row) {
return filter != null
- && filter.filterRowKey(row, offset, length);
+ && filter.filterRowKey(row, 0, row.length);
}
- protected void nextRow(byte [] currentRow, int offset, short length) throws IOException {
- KeyValue next;
- while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
- this.storeHeap.next(MOCKED_LIST);
+ protected void nextRow(byte [] currentRow) throws IOException {
+ while (Bytes.equals(currentRow, peekRow())) {
+ this.storeHeap.next(MOCKED_LIST);
}
results.clear();
resetFilters();
}
- private boolean isStopRow(byte [] currentRow, int offset, short length) {
+ private byte[] peekRow() {
+ KeyValue kv = this.storeHeap.peek();
+ return kv == null ? null : kv.getRow();
+ }
+
+ private boolean isStopRow(byte [] currentRow) {
return currentRow == null ||
(stopRow != null &&
comparator.compareRows(stopRow, 0, stopRow.length,
- currentRow, offset, length) <= isScan);
+ currentRow, 0, currentRow.length) <= isScan);
}
@Override
@@ -3828,7 +3695,6 @@ public class HRegion implements HeapSize
* @param conf
* @param hTableDescriptor
* @param hlog shared HLog
- * @param boolean initialize - true to initialize the region
* @return new HRegion
*
* @throws IOException
@@ -3836,36 +3702,7 @@ public class HRegion implements HeapSize
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
- final boolean initialize)
- throws IOException {
- return createHRegion(info, rootDir, conf, hTableDescriptor,
- hlog, initialize, false);
- }
-
- /**
- * Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed
- * explicitly, if it is not null.
- * Use {@link HRegion#getLog()} to get access.
- *
- * @param info Info for region to create.
- * @param rootDir Root directory for HBase instance
- * @param conf
- * @param hTableDescriptor
- * @param hlog shared HLog
- * @param boolean initialize - true to initialize the region
- * @param boolean ignoreHLog
- - true to skip generate new hlog if it is null, mostly for createTable
- * @return new HRegion
- *
- * @throws IOException
- */
- public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
- final Configuration conf,
- final HTableDescriptor hTableDescriptor,
- final HLog hlog,
- final boolean initialize, final boolean ignoreHLog)
+ final HLog hlog)
throws IOException {
LOG.info("creating HRegion " + info.getTableNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
@@ -3877,26 +3714,16 @@ public class HRegion implements HeapSize
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
HLog effectiveHLog = hlog;
- if (hlog == null && !ignoreHLog) {
+ if (hlog == null) {
effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
}
HRegion region = HRegion.newHRegion(tableDir,
effectiveHLog, fs, conf, info, hTableDescriptor, null);
- if (initialize) {
- region.initialize();
- }
+ region.initialize();
return region;
}
- public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
- final Configuration conf,
- final HTableDescriptor hTableDescriptor,
- final HLog hlog)
- throws IOException {
- return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
- }
-
/**
* Open a Region.
* @param info Info for region to be opened.
@@ -4351,19 +4178,9 @@ public class HRegion implements HeapSize
//
/**
* @param get get object
- * @return result
- * @throws IOException read exceptions
- */
- public Result get(final Get get) throws IOException {
- return get(get, null);
- }
-
- /**
- * @param get get object
* @param lockid existing lock id, or null for no previous lock
* @return result
* @throws IOException read exceptions
- * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public Result get(final Get get, final Integer lockid) throws IOException {
checkRow(get.getRow(), "Get");
@@ -4418,7 +4235,7 @@ public class HRegion implements HeapSize
// do after lock
final long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateGetMetrics(get.familySet(), after - now);
-
+
return results;
}
@@ -4486,7 +4303,7 @@ public class HRegion implements HeapSize
}
// 3. acquire the region lock
- lock(this.updatesLock.readLock(), acquiredLocks.size());
+ this.updatesLock.readLock().lock();
locked = true;
// 4. Get a mvcc write number
@@ -4608,23 +4425,6 @@ public class HRegion implements HeapSize
// TODO: There's a lot of boiler plate code identical
// to increment... See how to better unify that.
-
- /**
- *
- * Perform one or more append operations on a row.
- * <p>
- * Appends performed are done under row lock but reads do not take locks out
- * so this can be seen partially complete by gets and scans.
- *
- * @param append
- * @param writeToWAL
- * @return new keyvalues after increment
- * @throws IOException
- */
- public Result append(Append append, boolean writeToWAL)
- throws IOException {
- return append(append, null, writeToWAL);
- }
/**
*
* Perform one or more append operations on a row.
@@ -4637,7 +4437,6 @@ public class HRegion implements HeapSize
* @param writeToWAL
* @return new keyvalues after increment
* @throws IOException
- * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
*/
public Result append(Append append, Integer lockid, boolean writeToWAL)
throws IOException {
@@ -4657,7 +4456,7 @@ public class HRegion implements HeapSize
this.writeRequestsCount.increment();
try {
Integer lid = getLock(lockid, row, true);
- lock(this.updatesLock.readLock());
+ this.updatesLock.readLock().lock();
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
@@ -4764,10 +4563,10 @@ public class HRegion implements HeapSize
closeRegionOperation();
}
-
+
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -4777,22 +4576,6 @@ public class HRegion implements HeapSize
}
/**
- *
- * Perform one or more increment operations on a row.
- * <p>
- * Increments performed are done under row lock but reads do not take locks
- * out so this can be seen partially complete by gets and scans.
- * @param increment
- * @param writeToWAL
- * @return new keyvalues after increment
- * @throws IOException
- */
- public Result increment(Increment increment, boolean writeToWAL)
- throws IOException {
- return increment(increment, null, writeToWAL);
- }
-
- /**
*
* Perform one or more increment operations on a row.
* <p>
@@ -4803,8 +4586,6 @@ public class HRegion implements HeapSize
* @param writeToWAL
* @return new keyvalues after increment
* @throws IOException
- * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
-
*/
public Result increment(Increment increment, Integer lockid,
boolean writeToWAL)
@@ -4826,7 +4607,7 @@ public class HRegion implements HeapSize
this.writeRequestsCount.increment();
try {
Integer lid = getLock(lockid, row, true);
- lock(this.updatesLock.readLock());
+ this.updatesLock.readLock().lock();
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
@@ -4910,7 +4691,7 @@ public class HRegion implements HeapSize
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
}
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -4944,7 +4725,7 @@ public class HRegion implements HeapSize
this.writeRequestsCount.increment();
try {
Integer lid = obtainRowLock(row);
- lock(this.updatesLock.readLock());
+ this.updatesLock.readLock().lock();
try {
Store store = stores.get(family);
@@ -5037,8 +4818,8 @@ public class HRegion implements HeapSize
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 35 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
- (7 * Bytes.SIZEOF_LONG) +
+ 35 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
+ (5 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
@@ -5338,16 +5119,13 @@ public class HRegion implements HeapSize
* #closeRegionOperation needs to be called in the try's finally block
* Acquires a read lock and checks if the region is closing or closed.
* @throws NotServingRegionException when the region is closing or closed
- * @throws RegionTooBusyException if failed to get the lock in time
- * @throws InterruptedIOException if interrupted while waiting for a lock
*/
- public void startRegionOperation()
- throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
+ private void startRegionOperation() throws NotServingRegionException {
if (this.closing.get()) {
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing");
}
- lock(lock.readLock());
+ lock.readLock().lock();
if (this.closed.get()) {
lock.readLock().unlock();
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
@@ -5359,7 +5137,7 @@ public class HRegion implements HeapSize
* Closes the lock. This needs to be called in the finally block corresponding
* to the try block of #startRegionOperation
*/
- public void closeRegionOperation(){
+ private void closeRegionOperation(){
lock.readLock().unlock();
}
@@ -5369,17 +5147,15 @@ public class HRegion implements HeapSize
* #closeBulkRegionOperation needs to be called in the try's finally block
* Acquires a writelock and checks if the region is closing or closed.
* @throws NotServingRegionException when the region is closing or closed
- * @throws RegionTooBusyException if failed to get the lock in time
- * @throws InterruptedIOException if interrupted while waiting for a lock
*/
private void startBulkRegionOperation(boolean writeLockNeeded)
- throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
+ throws NotServingRegionException {
if (this.closing.get()) {
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing");
}
- if (writeLockNeeded) lock(lock.writeLock());
- else lock(lock.readLock());
+ if (writeLockNeeded) lock.writeLock().lock();
+ else lock.readLock().lock();
if (this.closed.get()) {
if (writeLockNeeded) lock.writeLock().unlock();
else lock.readLock().unlock();
@@ -5392,7 +5168,7 @@ public class HRegion implements HeapSize
* Closes the lock. This needs to be called in the finally block corresponding
* to the try block of #startRegionOperation
*/
- private void closeBulkRegionOperation() {
+ private void closeBulkRegionOperation(){
if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
else lock.readLock().unlock();
}
@@ -5403,7 +5179,7 @@ public class HRegion implements HeapSize
*/
private void recordPutWithoutWal(final Map<byte [], List<KeyValue>> familyMap) {
if (numPutsWithoutWAL.getAndIncrement() == 0) {
- LOG.info("writing data to region " + this +
+ LOG.info("writing data to region " + this +
" with WAL disabled. Data may be lost in the event of a crash.");
}
@@ -5417,33 +5193,6 @@ public class HRegion implements HeapSize
dataInMemoryWithoutWAL.addAndGet(putSize);
}
- private void lock(final Lock lock)
- throws RegionTooBusyException, InterruptedIOException {
- lock(lock, 1);
- }
-
- /**
- * Try to acquire a lock. Throw RegionTooBusyException
- * if failed to get the lock in time. Throw InterruptedIOException
- * if interrupted while waiting for the lock.
- */
- private void lock(final Lock lock, final int multiplier)
- throws RegionTooBusyException, InterruptedIOException {
- try {
- final long waitTime = Math.min(maxBusyWaitDuration,
- busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
- if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
- throw new RegionTooBusyException(
- "failed to get a lock in " + waitTime + "ms");
- }
- } catch (InterruptedException ie) {
- LOG.info("Interrupted while waiting for a lock");
- InterruptedIOException iie = new InterruptedIOException();
- iie.initCause(ie);
- throw iie;
- }
- }
-
/**
* Calls sync with the given transaction ID if the region's table is not
* deferring it.
@@ -5483,6 +5232,7 @@ public class HRegion implements HeapSize
}
};
+
/**
* Facility for dumping and compacting catalog tables.
* Only does catalog tables since these are only tables we for sure know
@@ -5515,11 +5265,11 @@ public class HRegion implements HeapSize
final HLog log = new HLog(fs, logdir, oldLogDir, c);
try {
processTable(fs, tableDir, log, c, majorCompact);
- } finally {
+ } finally {
log.close();
// TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache();
if (bc != null) bc.shutdown();
- }
+ }
}
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Dec 23 19:34:53 2012
@@ -44,7 +44,6 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -97,7 +96,6 @@ import org.apache.hadoop.hbase.client.Mu
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
@@ -167,7 +165,6 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.codehaus.jackson.map.ObjectMapper;
-import org.joda.time.field.MillisDurationField;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
@@ -236,7 +233,7 @@ public class HRegionServer implements HR
// Server to handle client requests. Default access so can be accessed by
// unit tests.
RpcServer rpcServer;
-
+
// Server to handle client requests.
private HBaseServer server;
@@ -366,8 +363,6 @@ public class HRegionServer implements HR
*/
private ClusterId clusterId = null;
- private RegionServerCoprocessorHost rsHost;
-
/**
* Starts a HRegionServer at the default location
*
@@ -439,10 +434,6 @@ public class HRegionServer implements HR
this.rpcServer.setQosFunction(new QosFunction());
this.startcode = System.currentTimeMillis();
- // login the zookeeper client principal (if using security)
- ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
- "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
-
// login the server principal (if using secure Hadoop)
User.login(this.conf, "hbase.regionserver.keytab.file",
"hbase.regionserver.kerberos.principal", this.isa.getHostName());
@@ -1022,7 +1013,6 @@ public class HRegionServer implements HR
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
- this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
startServiceThreads();
LOG.info("Serving as " + this.serverNameFromMasterPOV +
", RPC listening on " + this.isa +
@@ -1030,7 +1020,6 @@ public class HRegionServer implements HR
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
isOnline = true;
} catch (Throwable e) {
- LOG.warn("Exception in region server : ", e);
this.isOnline = false;
stop("Failed initialization");
throw convertThrowableToIOE(cleanup(e, "Failed init"),
@@ -1106,7 +1095,8 @@ public class HRegionServer implements HR
storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
totalStaticIndexSizeKB, totalStaticBloomSizeKB,
(int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(),
- totalCompactingKVs, currentCompactedKVs);
+ totalCompactingKVs, currentCompactedKVs,
+ r.getCoprocessorHost().getCoprocessors());
}
/**
@@ -1586,7 +1576,6 @@ public class HRegionServer implements HR
this.splitLogWorker = new SplitLogWorker(this.zooKeeper,
this.getConfiguration(), this.getServerName().toString());
splitLogWorker.start();
-
}
/**
@@ -1654,15 +1643,10 @@ public class HRegionServer implements HR
@Override
public void stop(final String msg) {
- try {
- this.rsHost.preStop(msg);
- this.stopped = true;
- LOG.info("STOPPED: " + msg);
- // Wakes run() if it is sleeping
- sleeper.skipSleepCycle();
- } catch (IOException exp) {
- LOG.warn("The region server did not stop", exp);
- }
+ this.stopped = true;
+ LOG.info("STOPPED: " + msg);
+ // Wakes run() if it is sleeping
+ sleeper.skipSleepCycle();
}
public void waitForServerOnline(){
@@ -2446,32 +2430,23 @@ public class HRegionServer implements HR
}
}
- MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
- region.startRegionOperation();
- try {
- int i = 0;
- synchronized(s) {
- for (; i < nbRows
- && currentScanResultSize < maxScannerResultSize; i++) {
- // Collect values to be returned here
- boolean moreRows = s.nextRaw(values, SchemaMetrics.METRIC_NEXTSIZE);
- if (!values.isEmpty()) {
- for (KeyValue kv : values) {
- currentScanResultSize += kv.heapSize();
- }
- results.add(new Result(values));
- }
- if (!moreRows) {
- break;
- }
- values.clear();
+ for (int i = 0; i < nbRows
+ && currentScanResultSize < maxScannerResultSize; i++) {
+ requestCount.incrementAndGet();
+ // Collect values to be returned here
+ boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE);
+ if (!values.isEmpty()) {
+ for (KeyValue kv : values) {
+ currentScanResultSize += kv.heapSize();
}
+ results.add(new Result(values));
}
- requestCount.addAndGet(i);
- region.readRequestsCount.add(i);
- } finally {
- region.closeRegionOperation();
+ if (!moreRows) {
+ break;
+ }
+ values.clear();
}
+
// coprocessor postNext hook
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);
@@ -2614,9 +2589,6 @@ public class HRegionServer implements HR
return -1;
}
- /**
- * @deprecated {@link RowLock} and associated operations are deprecated.
- */
public long lockRow(byte[] regionName, byte[] row) throws IOException {
checkOpen();
NullPointerException npe = null;
@@ -2633,9 +2605,6 @@ public class HRegionServer implements HR
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().preLockRow(regionName, row);
- }
Integer r = region.obtainRowLock(row);
long lockId = addRowLock(r, region);
LOG.debug("Row lock " + lockId + " explicitly acquired by client");
@@ -2679,9 +2648,6 @@ public class HRegionServer implements HR
return rl;
}
- /**
- * @deprecated {@link RowLock} and associated operations are deprecated.
- */
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
public void unlockRow(byte[] regionName, long lockId) throws IOException {
@@ -2700,9 +2666,6 @@ public class HRegionServer implements HR
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().preUnLockRow(regionName, lockId);
- }
String lockName = String.valueOf(lockId);
Integer r = rowlocks.remove(lockName);
if (r == null) {
@@ -2879,11 +2842,6 @@ public class HRegionServer implements HR
final int versionOfClosingNode)
throws IOException {
checkOpen();
- //Check for permissions to close.
- HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
- if (actualRegion.getCoprocessorHost() != null) {
- actualRegion.getCoprocessorHost().preClose(false);
- }
LOG.info("Received close region: " + region.getRegionNameAsString() +
". Version of ZK closing node:" + versionOfClosingNode);
boolean hasit = this.onlineRegions.containsKey(region.getEncodedName());
@@ -2931,17 +2889,6 @@ public class HRegionServer implements HR
*/
protected boolean closeRegion(HRegionInfo region, final boolean abort,
final boolean zk, final int versionOfClosingNode) {
-
- HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
- if ((actualRegion != null) && (actualRegion.getCoprocessorHost() !=null)){
- try {
- actualRegion.getCoprocessorHost().preClose(abort);
- } catch (IOException e) {
- LOG.warn(e);
- return false;
- }
- }
-
if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
LOG.warn("Received close for region we are already opening or closing; " +
region.getEncodedName());
@@ -3642,10 +3589,6 @@ public class HRegionServer implements HR
return this.zooKeeper;
}
- public RegionServerCoprocessorHost getCoprocessorHost(){
- return this.rsHost;
- }
-
public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
return this.regionsInTransitionInRS;
@@ -3823,13 +3766,8 @@ public class HRegionServer implements HR
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getCoprocessors() {
- TreeSet<String> coprocessors = new TreeSet<String>(
- this.hlog.getCoprocessorHost().getCoprocessors());
- Collection<HRegion> regions = getOnlineRegionsLocalContext();
- for (HRegion region: regions) {
- coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
- }
- return coprocessors.toArray(new String[0]);
+ HServerLoad hsl = buildServerLoad();
+ return hsl == null? null: hsl.getCoprocessors();
}
/**
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Sun Dec 23 19:34:53 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -264,7 +265,7 @@ public class RegionCoprocessorHost
/**
* Invoked before a region open
*/
- public void preOpen(){
+ public void preOpen() {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -284,7 +285,7 @@ public class RegionCoprocessorHost
/**
* Invoked after a region open
*/
- public void postOpen(){
+ public void postOpen() {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -305,7 +306,7 @@ public class RegionCoprocessorHost
* Invoked before a region is closed
* @param abortRequested true if the server is aborting
*/
- public void preClose(boolean abortRequested) throws IOException {
+ public void preClose(boolean abortRequested) {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -313,7 +314,7 @@ public class RegionCoprocessorHost
try {
((RegionObserver)env.getInstance()).preClose(ctx, abortRequested);
} catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
+ handleCoprocessorThrowableNoRethrow(env, e);
}
}
}
@@ -323,7 +324,7 @@ public class RegionCoprocessorHost
* Invoked after a region is closed
* @param abortRequested true if the server is aborting
*/
- public void postClose(boolean abortRequested){
+ public void postClose(boolean abortRequested) {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -1482,31 +1483,5 @@ public class RegionCoprocessorHost
return hasLoaded;
}
-
- public void preLockRow(byte[] regionName, byte[] row) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- ctx = ObserverContext.createAndPrepare(env, ctx);
- ((RegionObserver) env.getInstance()).preLockRow(ctx, regionName, row);
- if (ctx.shouldComplete()) {
- break;
- }
- }
- }
- }
-
- public void preUnLockRow(byte[] regionName, long lockId) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- ctx = ObserverContext.createAndPrepare(env, ctx);
- ((RegionObserver) env.getInstance()).preUnlockRow(ctx, regionName, lockId);
- if (ctx.shouldComplete()) {
- break;
- }
- }
- }
- }
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Sun Dec 23 19:34:53 2012
@@ -20,10 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.util.List;
-
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
/**
* RegionScanner describes iterators over rows in an HRegion.
@@ -52,50 +49,4 @@ public interface RegionScanner extends I
*/
public boolean reseek(byte[] row) throws IOException;
- /**
- * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
- */
- public long getMvccReadPoint();
-
- /**
- * Grab the next row's worth of values with the default limit on the number of values
- * to return.
- * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
- * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
- * See {@link #nextRaw(List, int, String)}
- * @param result return output array
- * @param metric the metric name
- * @return true if more rows exist after this one, false if scanner is done
- * @throws IOException e
- */
- public boolean nextRaw(List<KeyValue> result, String metric) throws IOException;
-
- /**
- * Grab the next row's worth of values with a limit on the number of values
- * to return.
- * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
- * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
- * Example:
- * <code><pre>
- * HRegion region = ...;
- * RegionScanner scanner = ...
- * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
- * region.startRegionOperation();
- * try {
- * synchronized(scanner) {
- * ...
- * boolean moreRows = scanner.nextRaw(values);
- * ...
- * }
- * } finally {
- * region.closeRegionOperation();
- * }
- * </pre></code>
- * @param result return output array
- * @param limit limit on row count to get
- * @param metric the metric name
- * @return true if more rows exist after this one, false if scanner is done
- * @throws IOException e
- */
- public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException;
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Sun Dec 23 19:34:53 2012
@@ -82,8 +82,6 @@ public class ScanQueryMatcher {
/* row is not private for tests */
/** Row the query is on */
byte [] row;
- int rowOffset;
- short rowLength;
/**
* Oldest put in any of the involved store files
@@ -224,7 +222,7 @@ public class ScanQueryMatcher {
short rowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT);
offset += Bytes.SIZEOF_SHORT;
- int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
+ int ret = this.rowComparator.compareRows(row, 0, row.length,
bytes, offset, rowLength);
if (ret <= -1) {
return MatchCode.DONE;
@@ -387,10 +385,8 @@ public class ScanQueryMatcher {
* Set current row
* @param row
*/
- public void setRow(byte [] row, int offset, short length) {
+ public void setRow(byte [] row) {
this.row = row;
- this.rowOffset = offset;
- this.rowLength = length;
reset();
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sun Dec 23 19:34:53 2012
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -131,6 +132,9 @@ public class Store extends SchemaConfigu
private volatile long totalUncompressedBytes = 0L;
private final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final String storeNameStr;
+ private CompactionProgress progress;
+ private final int compactionKVMax;
private final boolean verifyBulkLoads;
/* The default priority for user-specified compaction requests.
@@ -154,6 +158,10 @@ public class Store extends SchemaConfigu
new CopyOnWriteArraySet<ChangedReadersObserver>();
private final int blocksize;
+ /** Compression algorithm for flush files and minor compaction */
+ private final Compression.Algorithm compression;
+ /** Compression algorithm for major compaction */
+ private final Compression.Algorithm compactionCompression;
private HFileDataBlockEncoder dataBlockEncoder;
/** Checksum configuration */
@@ -163,8 +171,6 @@ public class Store extends SchemaConfigu
// Comparing KeyValues
final KeyValue.KVComparator comparator;
- private final Compactor compactor;
-
/**
* Constructor
* @param basedir qualified path under which the region directory lives;
@@ -179,16 +185,25 @@ public class Store extends SchemaConfigu
protected Store(Path basedir, HRegion region, HColumnDescriptor family,
FileSystem fs, Configuration conf)
throws IOException {
- super(conf, region.getRegionInfo().getTableNameAsString(),
+ super(conf, region.getTableDesc().getNameAsString(),
Bytes.toString(family.getName()));
- HRegionInfo info = region.getRegionInfo();
+ HRegionInfo info = region.regionInfo;
this.fs = fs;
- Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
- this.homedir = createStoreHomeDir(this.fs, p);
+ this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
+ if (!this.fs.exists(this.homedir)) {
+ if (!this.fs.mkdirs(this.homedir))
+ throw new IOException("Failed create of: " + this.homedir.toString());
+ }
this.region = region;
this.family = family;
this.conf = conf;
this.blocksize = family.getBlocksize();
+ this.compression = family.getCompression();
+ // avoid overriding compression setting for major compactions if the user
+ // has not specified it separately
+ this.compactionCompression =
+ (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
+ family.getCompactionCompression() : this.compression;
this.dataBlockEncoder =
new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
@@ -213,6 +228,7 @@ public class Store extends SchemaConfigu
"ms in store " + this);
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
+ this.storeNameStr = getColumnFamilyName();
// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2,
@@ -229,8 +245,10 @@ public class Store extends SchemaConfigu
this.region.memstoreFlushSize);
this.maxCompactSize
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
+ this.compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
- this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
+ this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify",
+ false);
if (Store.closeCheckInterval == 0) {
Store.closeCheckInterval = conf.getInt(
@@ -242,47 +260,6 @@ public class Store extends SchemaConfigu
this.checksumType = getChecksumType(conf);
// initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf);
- // Create a compaction tool instance
- this.compactor = new Compactor(this.conf);
- }
-
- /**
- * @param family
- * @return
- */
- long getTTL(final HColumnDescriptor family) {
- // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
- long ttl = family.getTimeToLive();
- if (ttl == HConstants.FOREVER) {
- // Default is unlimited ttl.
- ttl = Long.MAX_VALUE;
- } else if (ttl == -1) {
- ttl = Long.MAX_VALUE;
- } else {
- // Second -> ms adjust for user data
- ttl *= 1000;
- }
- return ttl;
- }
-
- /**
- * Create this store's homedir
- * @param fs
- * @param homedir
- * @return Return <code>homedir</code>
- * @throws IOException
- */
- Path createStoreHomeDir(final FileSystem fs,
- final Path homedir) throws IOException {
- if (!fs.exists(homedir)) {
- if (!fs.mkdirs(homedir))
- throw new IOException("Failed create of: " + homedir.toString());
- }
- return homedir;
- }
-
- FileSystem getFileSystem() {
- return this.fs;
}
/**
@@ -343,7 +320,7 @@ public class Store extends SchemaConfigu
* Return the directory in which this store stores its
* StoreFiles
*/
- Path getHomedir() {
+ public Path getHomedir() {
return homedir;
}
@@ -362,10 +339,6 @@ public class Store extends SchemaConfigu
this.dataBlockEncoder = blockEncoder;
}
- FileStatus [] getStoreFiles() throws IOException {
- return FSUtils.listStatus(this.fs, this.homedir, null);
- }
-
/**
* Creates an unsorted list of StoreFile loaded in parallel
* from the given directory.
@@ -373,7 +346,7 @@ public class Store extends SchemaConfigu
*/
private List<StoreFile> loadStoreFiles() throws IOException {
ArrayList<StoreFile> results = new ArrayList<StoreFile>();
- FileStatus files[] = getStoreFiles();
+ FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
if (files == null || files.length == 0) {
return results;
@@ -664,7 +637,7 @@ public class Store extends SchemaConfigu
storeFileCloserThreadPool.shutdownNow();
}
}
- LOG.info("Closed " + this);
+ LOG.debug("closed " + this.storeNameStr);
return result;
} finally {
this.lock.writeLock().unlock();
@@ -750,7 +723,6 @@ public class Store extends SchemaConfigu
scanner = cpScanner;
}
try {
- int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
@@ -764,7 +736,7 @@ public class Store extends SchemaConfigu
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
do {
- hasMore = scanner.next(kvs, compactionKVMax);
+ hasMore = scanner.next(kvs, this.compactionKVMax);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
@@ -856,7 +828,7 @@ public class Store extends SchemaConfigu
*/
private StoreFile.Writer createWriterInTmp(int maxKeyCount)
throws IOException {
- return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
+ return createWriterInTmp(maxKeyCount, this.compression, false);
}
/*
@@ -1009,12 +981,16 @@ public class Store extends SchemaConfigu
* @param cr
* compaction details obtained from requestCompaction()
* @throws IOException
- * @return Storefile we compacted into or null if we failed or opted out early.
*/
- StoreFile compact(CompactionRequest cr) throws IOException {
- if (cr == null || cr.getFiles().isEmpty()) return null;
- Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
+ void compact(CompactionRequest cr) throws IOException {
+ if (cr == null || cr.getFiles().isEmpty()) {
+ return;
+ }
+ Preconditions.checkArgument(cr.getStore().toString()
+ .equals(this.toString()));
+
List<StoreFile> filesToCompact = cr.getFiles();
+
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
// TODO: change this to LOG.error() after more debugging
@@ -1026,26 +1002,19 @@ public class Store extends SchemaConfigu
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
- + this + " of "
+ + this.storeNameStr + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null;
try {
- StoreFile.Writer writer =
- this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
+ StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
+ maxId);
// Move the compaction into place.
- if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
- sf = completeCompaction(filesToCompact, writer);
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompact(this, sf);
- }
- } else {
- // Create storefile around what we wrote with a reader on it.
- sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- sf.createReader();
+ sf = completeCompaction(filesToCompact, writer);
+ if (region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postCompact(this, sf);
}
} finally {
synchronized (filesCompacting) {
@@ -1054,7 +1023,7 @@ public class Store extends SchemaConfigu
}
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
- + filesToCompact.size() + " file(s) in " + this + " of "
+ + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into " +
(sf == null ? "none" : sf.getPath().getName()) +
@@ -1062,7 +1031,6 @@ public class Store extends SchemaConfigu
StringUtils.humanReadableInt(sf.getReader().length()))
+ "; total size for store is "
+ StringUtils.humanReadableInt(storeSize));
- return sf;
}
/**
@@ -1102,8 +1070,7 @@ public class Store extends SchemaConfigu
try {
// Ready to go. Have list of files to compact.
- StoreFile.Writer writer =
- this.compactor.compact(this, filesToCompact, isMajor, maxId);
+ StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer);
if (region.getCoprocessorHost() != null) {
@@ -1152,10 +1119,10 @@ public class Store extends SchemaConfigu
}
/** getter for CompactionProgress object
- * @return CompactionProgress object; can be null
+ * @return CompactionProgress object
*/
public CompactionProgress getCompactionProgress() {
- return this.compactor.getProgress();
+ return this.progress;
}
/*
@@ -1207,19 +1174,19 @@ public class Store extends SchemaConfigu
if (sf.isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping major compaction of " + this +
+ LOG.debug("Skipping major compaction of " + this.storeNameStr +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + this.ttl);
}
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
- LOG.debug("Major compaction triggered on store " + this +
+ LOG.debug("Major compaction triggered on store " + this.storeNameStr +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Major compaction triggered on store " + this +
+ LOG.debug("Major compaction triggered on store " + this.storeNameStr +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
@@ -1409,12 +1376,12 @@ public class Store extends SchemaConfigu
compactSelection.getFilesToCompact().get(pos).getReader().length()
> maxCompactSize &&
!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
- if (pos != 0) compactSelection.clearSubList(0, pos);
+ compactSelection.clearSubList(0, pos);
}
if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
- this + ": no store files to compact");
+ this.storeNameStr + ": no store files to compact");
compactSelection.emptyFileList();
return compactSelection;
}
@@ -1501,7 +1468,7 @@ public class Store extends SchemaConfigu
// if we don't have enough files to compact, just wait
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipped compaction of " + this
+ LOG.debug("Skipped compaction of " + this.storeNameStr
+ ". Only " + (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria.");
@@ -1528,6 +1495,149 @@ public class Store extends SchemaConfigu
}
/**
+ * Do a minor/major compaction on an explicit set of storefiles in a Store.
+ * Uses the scan infrastructure to make it easy.
+ *
+ * @param filesToCompact which files to compact
+ * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+ * @param maxId Readers maximum sequence id.
+ * @return Product of compaction or null if all cells expired or deleted and
+ * nothing made it through the compaction.
+ * @throws IOException
+ */
+ StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
+ final boolean majorCompaction, final long maxId)
+ throws IOException {
+ // calculate maximum key count after compaction (for blooms)
+ int maxKeyCount = 0;
+ long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+ for (StoreFile file : filesToCompact) {
+ StoreFile.Reader r = file.getReader();
+ if (r != null) {
+ // NOTE: getFilterEntries could cause under-sized blooms if the user
+ // switches bloom type (e.g. from ROW to ROWCOL)
+ long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
+ ? r.getFilterEntries() : r.getEntries();
+ maxKeyCount += keyCount;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compacting " + file +
+ ", keycount=" + keyCount +
+ ", bloomtype=" + r.getBloomFilterType().toString() +
+ ", size=" + StringUtils.humanReadableInt(r.length()) +
+ ", encoding=" + r.getHFileReader().getEncodingOnDisk());
+ }
+ }
+ // For major compactions calculate the earliest put timestamp
+ // of all involved storefiles. This is used to remove
+ // family delete marker during the compaction.
+ if (majorCompaction) {
+ byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+ if (tmp == null) {
+ // there's a file with no information, must be an old one
+ // assume we have very old puts
+ earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+ } else {
+ earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+ }
+ }
+ }
+
+ // keep track of compaction progress
+ progress = new CompactionProgress(maxKeyCount);
+
+ // For each file, obtain a scanner:
+ List<StoreFileScanner> scanners = StoreFileScanner
+ .getScannersForStoreFiles(filesToCompact, false, false, true);
+
+ // Make the instantiation lazy in case compaction produces no product; i.e.
+ // where all source cells are expired or deleted.
+ StoreFile.Writer writer = null;
+ // Find the smallest read point across all the Scanners.
+ long smallestReadPoint = region.getSmallestReadPoint();
+ MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+ try {
+ InternalScanner scanner = null;
+ try {
+ if (getHRegion().getCoprocessorHost() != null) {
+ scanner = getHRegion()
+ .getCoprocessorHost()
+ .preCompactScannerOpen(this, scanners,
+ majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
+ }
+ if (scanner == null) {
+ Scan scan = new Scan();
+ scan.setMaxVersions(getFamily().getMaxVersions());
+ /* Include deletes, unless we are doing a major compaction */
+ scanner = new StoreScanner(this, getScanInfo(), scan, scanners,
+ majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+ smallestReadPoint, earliestPutTs);
+ }
+ if (getHRegion().getCoprocessorHost() != null) {
+ InternalScanner cpScanner =
+ getHRegion().getCoprocessorHost().preCompact(this, scanner);
+ // NULL scanner returned from coprocessor hooks means skip normal processing
+ if (cpScanner == null) {
+ return null;
+ }
+ scanner = cpScanner;
+ }
+
+ int bytesWritten = 0;
+ // since scanner.next() can return 'false' but still be delivering data,
+ // we have to use a do/while loop.
+ ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
+ // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(kvs, this.compactionKVMax);
+ if (writer == null && !kvs.isEmpty()) {
+ writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
+ true);
+ }
+ if (writer != null) {
+ // output to writer:
+ for (KeyValue kv : kvs) {
+ if (kv.getMemstoreTS() <= smallestReadPoint) {
+ kv.setMemstoreTS(0);
+ }
+ writer.append(kv);
+ // update progress per key
+ ++progress.currentCompactedKVs;
+
+ // check periodically to see if a system stop is requested
+ if (Store.closeCheckInterval > 0) {
+ bytesWritten += kv.getLength();
+ if (bytesWritten > Store.closeCheckInterval) {
+ bytesWritten = 0;
+ if (!this.region.areWritesEnabled()) {
+ writer.close();
+ fs.delete(writer.getPath(), false);
+ throw new InterruptedIOException(
+ "Aborting compaction of store " + this +
+ " in region " + this.region +
+ " because user requested stop.");
+ }
+ }
+ }
+ }
+ }
+ kvs.clear();
+ } while (hasMore);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ } finally {
+ if (writer != null) {
+ writer.appendMetadata(maxId, majorCompaction);
+ writer.close();
+ }
+ }
+ return writer;
+ }
+
+ /**
* Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation.
*
@@ -1631,7 +1741,7 @@ public class Store extends SchemaConfigu
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
- LOG.error("Failed replacing compacted files in " + this +
+ LOG.error("Failed replacing compacted files in " + this.storeNameStr +
". Compacted file is " + (result == null? "none": result.toString()) +
". Files replaced " + compactedFiles.toString() +
" some of which may have been already removed", e);
@@ -1917,7 +2027,7 @@ public class Store extends SchemaConfigu
return mk.getRow();
}
} catch(IOException e) {
- LOG.warn("Failed getting store size for " + this, e);
+ LOG.warn("Failed getting store size for " + this.storeNameStr, e);
} finally {
this.lock.readLock().unlock();
}
@@ -1970,7 +2080,7 @@ public class Store extends SchemaConfigu
@Override
public String toString() {
- return getColumnFamilyName();
+ return this.storeNameStr;
}
/**
@@ -2086,7 +2196,7 @@ public class Store extends SchemaConfigu
}
HRegionInfo getHRegionInfo() {
- return this.region.getRegionInfo();
+ return this.region.regionInfo;
}
/**
@@ -2214,8 +2324,8 @@ public class Store extends SchemaConfigu
public static final long FIXED_OVERHEAD =
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
- + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
- + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+ + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sun Dec 23 19:34:53 2012
@@ -320,7 +320,7 @@ public class StoreFile extends SchemaCon
* @return Calculated path to parent region file.
* @throws IOException
*/
- public static Path getReferredToFile(final Path p) {
+ static Path getReferredToFile(final Path p) {
Matcher m = REF_NAME_PARSER.matcher(p.getName());
if (m == null || !m.matches()) {
LOG.warn("Failed match of store file name " + p.toString());
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sun Dec 23 19:34:53 2012
@@ -340,11 +340,8 @@ public class StoreScanner extends NonLaz
// only call setRow if the row changes; avoids confusing the query matcher
// if scanning intra-row
- byte[] row = peeked.getBuffer();
- int offset = peeked.getRowOffset();
- short length = peeked.getRowLength();
- if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
- matcher.setRow(row, offset, length);
+ if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
+ matcher.setRow(peeked.getRow());
}
KeyValue kv;
@@ -524,12 +521,9 @@ public class StoreScanner extends NonLaz
if (kv == null) {
kv = lastTopKey;
}
- byte[] row = kv.getBuffer();
- int offset = kv.getRowOffset();
- short length = kv.getRowLength();
- if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
+ if ((matcher.row == null) || !kv.matchingRow(matcher.row)) {
matcher.reset();
- matcher.setRow(row, offset, length);
+ matcher.setRow(kv.getRow());
}
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java Sun Dec 23 19:34:53 2012
@@ -49,4 +49,5 @@ public class CompactionProgress {
public float getProgressPct() {
return currentCompactedKVs / totalCompactingKVs;
}
+
}
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Sun Dec 23 19:34:53 2012
@@ -143,12 +143,7 @@ public class Compressor {
// the status byte also acts as the higher order byte of the dictionary
// entry
short dictIdx = toShort(status, in.readByte());
- byte[] entry;
- try {
- entry = dict.getEntry(dictIdx);
- } catch (Exception ex) {
- throw new IOException("Unable to uncompress the log entry", ex);
- }
+ byte[] entry = dict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index "
+ dictIdx);
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Sun Dec 23 19:34:53 2012
@@ -167,7 +167,6 @@ public class HLog implements Syncable {
Entry next(Entry reuse) throws IOException;
void seek(long pos) throws IOException;
long getPosition() throws IOException;
- void reset() throws IOException;
}
public interface Writer {
@@ -696,18 +695,15 @@ public class HLog implements Syncable {
/**
* Get a reader for the WAL.
- * The proper way to tail a log that can be under construction is to first use this method
- * to get a reader then call {@link HLog.Reader#reset()} to see the new data. It will also
- * take care of keeping implementation-specific context (like compression).
* @param fs
* @param path
* @param conf
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
- public static Reader getReader(final FileSystem fs, final Path path,
- Configuration conf)
- throws IOException {
+ public static Reader getReader(final FileSystem fs,
+ final Path path, Configuration conf)
+ throws IOException {
try {
if (logReaderClass == null) {
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Sun Dec 23 19:34:53 2012
@@ -139,17 +139,15 @@ public class SequenceFileLogReader imple
Configuration conf;
WALReader reader;
- FileSystem fs;
// Needed logging exceptions
Path path;
int edit = 0;
long entryStart = 0;
- boolean emptyCompressionContext = true;
/**
* Compression context to use reading. Can be null if no compression.
*/
- protected CompressionContext compressionContext = null;
+ private CompressionContext compressionContext = null;
protected Class<? extends HLogKey> keyClass;
@@ -175,7 +173,6 @@ public class SequenceFileLogReader imple
this.conf = conf;
this.path = path;
reader = new WALReader(fs, path, conf);
- this.fs = fs;
// If compression is enabled, new dictionaries are created here.
boolean compression = reader.isWALCompressionEnabled();
@@ -240,22 +237,11 @@ public class SequenceFileLogReader imple
throw addFileInfoToException(ioe);
}
edit++;
- if (compressionContext != null && emptyCompressionContext) {
- emptyCompressionContext = false;
- }
return b? e: null;
}
@Override
public void seek(long pos) throws IOException {
- if (compressionContext != null && emptyCompressionContext) {
- while (next() != null) {
- if (getPosition() == pos) {
- emptyCompressionContext = false;
- break;
- }
- }
- }
try {
reader.seek(pos);
} catch (IOException ioe) {
@@ -300,11 +286,4 @@ public class SequenceFileLogReader imple
return ioe;
}
-
- @Override
- public void reset() throws IOException {
- // Resetting the reader lets us see newly added data if the file is being written to
- // We also keep the same compressionContext which was previously populated for this file
- reader = new WALReader(fs, path, conf);
- }
}
\ No newline at end of file