You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2008/11/21 03:37:04 UTC
svn commit: r719453 - in /hadoop/hbase/branches/0.19_on_hadoop_0.18/src:
java/org/apache/hadoop/hbase/regionserver/
test/org/apache/hadoop/hbase/regionserver/
Author: apurtell
Date: Thu Nov 20 18:37:04 2008
New Revision: 719453
URL: http://svn.apache.org/viewvc?rev=719453&view=rev
Log:
merge up to trunk (revision 719452)
Modified:
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Thu Nov 20 18:37:04 2008
@@ -244,35 +244,17 @@
return;
}
synchronized (updateLock) {
- if (this.writer != null) {
- // Close the current writer, get a new one.
- try {
- this.writer.close();
- } catch (IOException e) {
- // Failed close of log file. Means we're losing edits. For now,
- // shut ourselves down to minimize loss. Alternative is to try and
- // keep going. See HBASE-930.
- FailedLogCloseException flce =
- new FailedLogCloseException("#" + this.filenum);
- flce.initCause(e);
- throw e;
- }
- Path p = computeFilename(old_filenum);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing current log writer " + FSUtils.getPath(p));
- }
- if (filenum > 0) {
- synchronized (this.sequenceLock) {
- this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p);
- }
- }
- }
- old_filenum = filenum;
- filenum = System.currentTimeMillis();
- Path newPath = computeFilename(filenum);
+ // Clean up current writer.
+ Path oldFile = cleanupCurrentWriter();
+ // Create a new one.
+ this.old_filenum = this.filenum;
+ this.filenum = System.currentTimeMillis();
+ Path newPath = computeFilename(this.filenum);
this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
- HLogKey.class, HLogEdit.class, getCompressionType(this.conf));
- LOG.info("New log writer created at " + FSUtils.getPath(newPath));
+ HLogKey.class, HLogEdit.class, getCompressionType(this.conf));
+ LOG.info((oldFile != null?
+ "Closed " + oldFile + ", entries=" + this.numEntries + ". ": "") +
+ "New log writer: " + FSUtils.getPath(newPath));
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
@@ -286,38 +268,7 @@
}
this.outputfiles.clear();
} else {
- // Get oldest edit/sequence id. If logs are older than this id,
- // then safe to remove.
- Long oldestOutstandingSeqNum =
- Collections.min(this.lastSeqWritten.values());
- // Get the set of all log files whose final ID is older than or
- // equal to the oldest pending region operation
- TreeSet<Long> sequenceNumbers =
- new TreeSet<Long>(this.outputfiles.headMap(
- (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet());
- // Now remove old log files (if any)
- if (LOG.isDebugEnabled()) {
- // Find region associated with oldest key -- helps debugging.
- byte [] oldestRegion = null;
- for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
- if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
- oldestRegion = e.getKey();
- break;
- }
- }
- if (LOG.isDebugEnabled() && sequenceNumbers.size() > 0) {
- LOG.debug("Found " + sequenceNumbers.size() +
- " logs to remove " +
- "using oldest outstanding seqnum of " +
- oldestOutstandingSeqNum + " from region " +
- Bytes.toString(oldestRegion));
- }
- }
- if (sequenceNumbers.size() > 0) {
- for (Long seq : sequenceNumbers) {
- deleteLogFile(this.outputfiles.remove(seq), seq);
- }
- }
+ cleanOldLogs();
}
}
this.numEntries = 0;
@@ -328,6 +279,73 @@
}
}
+ /*
+ * Clean up old commit logs.
+ * @throws IOException
+ */
+ private void cleanOldLogs() throws IOException {
+ // Get oldest edit/sequence id. If logs are older than this id,
+ // then safe to remove.
+ Long oldestOutstandingSeqNum =
+ Collections.min(this.lastSeqWritten.values());
+ // Get the set of all log files whose final ID is older than or
+ // equal to the oldest pending region operation
+ TreeSet<Long> sequenceNumbers =
+ new TreeSet<Long>(this.outputfiles.headMap(
+ (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet());
+ // Now remove old log files (if any)
+ if (LOG.isDebugEnabled()) {
+ // Find region associated with oldest key -- helps debugging.
+ byte [] oldestRegion = null;
+ for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
+ if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
+ oldestRegion = e.getKey();
+ break;
+ }
+ }
+ LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
+ " out of total " + this.outputfiles.size() + "; " +
+ "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
+ " from region " + Bytes.toString(oldestRegion));
+ }
+ if (sequenceNumbers.size() > 0) {
+ for (Long seq : sequenceNumbers) {
+ deleteLogFile(this.outputfiles.remove(seq), seq);
+ }
+ }
+ }
+
+ /*
+ * Cleans up current writer closing and adding to outputfiles.
+ * Presumes we're operating inside an updateLock scope.
+ * @return Path to current writer or null if none.
+ * @throws IOException
+ */
+ private Path cleanupCurrentWriter() throws IOException {
+ Path oldFile = null;
+ if (this.writer != null) {
+ // Close the current writer, get a new one.
+ try {
+ this.writer.close();
+ } catch (IOException e) {
+ // Failed close of log file. Means we're losing edits. For now,
+ // shut ourselves down to minimize loss. Alternative is to try and
+ // keep going. See HBASE-930.
+ FailedLogCloseException flce =
+ new FailedLogCloseException("#" + this.filenum);
+ flce.initCause(e);
+ throw e;
+ }
+ oldFile = computeFilename(old_filenum);
+ if (filenum > 0) {
+ synchronized (this.sequenceLock) {
+ this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), oldFile);
+ }
+ }
+ }
+ return oldFile;
+ }
+
private void deleteLogFile(final Path p, final Long seqno) throws IOException {
LOG.info("removing old log file " + FSUtils.getPath(p) +
" whose highest sequence/edit id is " + seqno);
@@ -626,8 +644,9 @@
}
/**
- * Split up a bunch of log files, that are no longer being written to, into
- * new files, one per region. Delete the old log files when finished.
+ * Split up a bunch of regionserver commit log files that are no longer
+ * being written to, into new files, one per region for region to replay on
+ * startup. Delete the old log files when finished.
*
* @param rootDir qualified root directory of the HBase instance
* @param srcDir Directory of log files to split: e.g.
@@ -636,19 +655,42 @@
* @param conf HBaseConfiguration
* @throws IOException
*/
- public static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
- Configuration conf) throws IOException {
+ public static void splitLog(final Path rootDir, final Path srcDir,
+ final FileSystem fs, final Configuration conf)
+ throws IOException {
if (!fs.exists(srcDir)) {
// Nothing to do
return;
}
- FileStatus logfiles[] = fs.listStatus(srcDir);
+ FileStatus [] logfiles = fs.listStatus(srcDir);
if (logfiles == null || logfiles.length == 0) {
// Nothing to do
return;
}
- LOG.info("splitting " + logfiles.length + " log(s) in " +
+ LOG.info("Splitting " + logfiles.length + " log(s) in " +
srcDir.toString());
+ splitLog(rootDir, logfiles, fs, conf);
+ try {
+ fs.delete(srcDir, true);
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ IOException io = new IOException("Cannot delete: " + srcDir);
+ io.initCause(e);
+ throw io;
+ }
+ LOG.info("log file splitting completed for " + srcDir.toString());
+ }
+
+ /*
+ * @param rootDir
+ * @param logfiles
+ * @param fs
+ * @param conf
+ * @throws IOException
+ */
+ private static void splitLog(final Path rootDir, final FileStatus [] logfiles,
+ final FileSystem fs, final Configuration conf)
+ throws IOException {
Map<byte [], SequenceFile.Writer> logWriters =
new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
try {
@@ -743,16 +785,6 @@
w.close();
}
}
-
- try {
- fs.delete(srcDir, true);
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- IOException io = new IOException("Cannot delete: " + srcDir);
- io.initCause(e);
- throw io;
- }
- LOG.info("log file splitting completed for " + srcDir.toString());
}
/**
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Nov 20 18:37:04 2008
@@ -110,6 +110,11 @@
static final Random rand = new Random();
static final Log LOG = LogFactory.getLog(HRegion.class);
final AtomicBoolean closed = new AtomicBoolean(false);
+ /* Closing can take some time; use the closing flag if there is stuff we don't want
+ * to do while in closing state; e.g. like offer this region up to the master as a region
+ * to close if the carrying regionserver is overloaded. Once set, it is never cleared.
+ */
+ private final AtomicBoolean closing = new AtomicBoolean(false);
private final RegionHistorian historian;
//////////////////////////////////////////////////////////////////////////////
@@ -330,6 +335,13 @@
}
/**
+ * @return True if closing process has started.
+ */
+ public boolean isClosing() {
+ return this.closing.get();
+ }
+
+ /**
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
*
@@ -365,6 +377,7 @@
LOG.warn("region " + this + " already closed");
return null;
}
+ this.closing.set(true);
synchronized (splitLock) {
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
@@ -419,7 +432,6 @@
result.addAll(store.close());
}
this.closed.set(true);
-
LOG.info("Closed " + this);
return result;
} finally {
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Nov 20 18:37:04 2008
@@ -296,8 +296,7 @@
long lastMsg = 0;
// Now ask master what it wants us to do and tell it what we have done
for (int tries = 0; !stopRequested.get() && isHealthy();) {
- // Try to get the root region location from the master.
- if (!haveRootRegion.get()) {
+ // Try to get the root region location from the master.
HServerAddress rootServer = hbaseMaster.getRootRegionLocation();
if (rootServer != null) {
// By setting the root region location, we bypass the wait imposed on
@@ -306,8 +305,7 @@
new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootServer));
haveRootRegion.set(true);
}
- }
- long now = System.currentTimeMillis();
+ long now = System.currentTimeMillis();
if (lastMsg != 0 && (now - lastMsg) >= serverLeaseTimeout) {
// It has been way too long since we last reported to the master.
LOG.warn("unable to report to master for " + (now - lastMsg) +
@@ -565,7 +563,6 @@
* only called when the HRegionServer receives a kill signal.
*/
private static class ShutdownThread extends Thread {
- private final Log LOG = LogFactory.getLog(this.getClass());
private final HRegionServer instance;
/**
@@ -591,7 +588,6 @@
* compaction.
*/
private static class MajorCompactionChecker extends Chore {
- private final Log LOG = LogFactory.getLog(this.getClass());
private final HRegionServer instance;
MajorCompactionChecker(final HRegionServer h,
@@ -617,7 +613,7 @@
}
}
}
- };
+ }
/**
* Report the status of the server. A server is online once all the startup
@@ -1762,6 +1758,9 @@
ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
synchronized (onlineRegions) {
for (HRegion r : onlineRegions.values()) {
+ if (r.isClosed() || r.isClosing()) {
+ continue;
+ }
if (regions.size() < numRegionsToReport) {
regions.add(r.getRegionInfo());
} else {
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Nov 20 18:37:04 2008
@@ -908,7 +908,7 @@
this.compactionDir, this.info, family.getName(), -1L, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Started compaction of " + rdrs.size() + " file(s)" +
- (references? "(hasReferences=true)": " ") + " into " +
+ (references? ", hasReferences=true,": " ") + " into " +
FSUtils.getPath(compactedOutputFile.getMapFilePath()));
}
MapFile.Writer writer = compactedOutputFile.getWriter(this.fs,
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java Thu Nov 20 18:37:04 2008
@@ -21,13 +21,12 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,11 +35,12 @@
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MapFile;
/**
* Scanner scans both the memcache and the HStore
*/
-class HStoreScanner implements InternalScanner {
+class HStoreScanner implements InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(HStoreScanner.class);
private InternalScanner[] scanners;
@@ -50,6 +50,15 @@
private boolean multipleMatchers = false;
private RowFilterInterface dataFilter;
private HStore store;
+ private final long timestamp;
+ private final byte [][] targetCols;
+
+ // Indices for memcache scanner and hstorefile scanner.
+ private static final int MEMS_INDEX = 0;
+ private static final int HSFS_INDEX = MEMS_INDEX + 1;
+
+ // Used around transition from no storefile to the first.
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/** Create an Scanner with a handle on the memcache and HStore files. */
@SuppressWarnings("unchecked")
@@ -64,51 +73,72 @@
this.scanners = new InternalScanner[2];
this.resultSets = new TreeMap[scanners.length];
this.keys = new HStoreKey[scanners.length];
+ // Save these args in case we need them later handling change in readers
+ // See updateReaders below.
+ this.timestamp = timestamp;
+ this.targetCols = targetCols;
try {
- scanners[0] = store.memcache.getScanner(timestamp, targetCols, firstRow);
- scanners[1] = new StoreFileScanner(store, timestamp, targetCols, firstRow);
- for (int i = 0; i < scanners.length; i++) {
- if (scanners[i].isWildcardScanner()) {
- this.wildcardMatch = true;
- }
- if (scanners[i].isMultipleMatchScanner()) {
- this.multipleMatchers = true;
- }
- }
- } catch(IOException e) {
- for (int i = 0; i < this.scanners.length; i++) {
- if(scanners[i] != null) {
- closeScanner(i);
- }
+ scanners[MEMS_INDEX] =
+ store.memcache.getScanner(timestamp, targetCols, firstRow);
+ scanners[HSFS_INDEX] =
+ new StoreFileScanner(store, timestamp, targetCols, firstRow);
+ for (int i = MEMS_INDEX; i < scanners.length; i++) {
+ checkScannerFlags(i);
}
+ } catch (IOException e) {
+ doClose();
throw e;
}
// Advance to the first key in each scanner.
// All results will match the required column-set and scanTime.
- for (int i = 0; i < scanners.length; i++) {
- keys[i] = new HStoreKey();
- resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
- if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
- closeScanner(i);
- }
+ for (int i = MEMS_INDEX; i < scanners.length; i++) {
+ setupScanner(i);
+ }
+
+ this.store.addChangedReaderObserver(this);
+ }
+
+ /*
+ * @param i Index.
+ */
+ private void checkScannerFlags(final int i) {
+ if (this.scanners[i].isWildcardScanner()) {
+ this.wildcardMatch = true;
+ }
+ if (this.scanners[i].isMultipleMatchScanner()) {
+ this.multipleMatchers = true;
+ }
+ }
+
+ /*
+ * Do scanner setup.
+ * @param i
+ * @throws IOException
+ */
+ private void setupScanner(final int i) throws IOException {
+ this.keys[i] = new HStoreKey();
+ this.resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
+ if (this.scanners[i] != null && !this.scanners[i].next(this.keys[i], this.resultSets[i])) {
+ closeScanner(i);
}
}
/** @return true if the scanner is a wild card scanner */
public boolean isWildcardScanner() {
- return wildcardMatch;
+ return this.wildcardMatch;
}
/** @return true if the scanner is a multiple match scanner */
public boolean isMultipleMatchScanner() {
- return multipleMatchers;
+ return this.multipleMatchers;
}
public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
- throws IOException {
-
+ throws IOException {
+ this.lock.readLock().lock();
+ try {
// Filtered flag is set by filters. If a cell has been 'filtered out'
// -- i.e. it is not to be returned to the caller -- the flag is 'true'.
boolean filtered = true;
@@ -243,6 +273,9 @@
}
return moreToFollow;
+ } finally {
+ this.lock.readLock().unlock();
+ }
}
/** Shut down a single scanner */
@@ -261,10 +294,43 @@
}
public void close() {
- for(int i = 0; i < scanners.length; i++) {
- if(scanners[i] != null) {
+ this.store.deleteChangedReaderObserver(this);
+ doClose();
+ }
+
+ private void doClose() {
+ for (int i = MEMS_INDEX; i < scanners.length; i++) {
+ if (scanners[i] != null) {
closeScanner(i);
}
}
}
-}
+
+ // Implementation of ChangedReadersObserver
+
+ public void updateReaders() throws IOException {
+ this.lock.writeLock().lock();
+ try {
+ MapFile.Reader [] readers = this.store.getReaders();
+ if (this.scanners[HSFS_INDEX] == null && readers != null &&
+ readers.length > 0) {
+ // Presume that we went from no readers to at least one -- need to put
+ // a HStoreScanner in place.
+ try {
+ // I think its safe getting key from mem at this stage -- it shouldn't have
+ // been flushed yet
+ this.scanners[HSFS_INDEX] = new StoreFileScanner(this.store,
+ this.timestamp, this. targetCols, this.keys[MEMS_INDEX].getRow());
+ checkScannerFlags(HSFS_INDEX);
+ setupScanner(HSFS_INDEX);
+ LOG.debug("Added a StoreFileScanner to outstanding HStoreScanner");
+ } catch (IOException e) {
+ doClose();
+ throw e;
+ }
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Thu Nov 20 18:37:04 2008
@@ -61,8 +61,6 @@
}
rollLock.lock(); // Don't interrupt us. We're working
try {
- LOG.info("Rolling hlog. Number of entries: " +
- server.getLog().getNumEntries());
server.getLog().rollWriter();
} catch (FailedLogCloseException e) {
LOG.fatal("Forcing server shutdown", e);
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java Thu Nov 20 18:37:04 2008
@@ -22,25 +22,30 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.SortedMap;
import java.util.TreeMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.dfs.MiniDFSCluster;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.HBaseTestCase;
-import org.apache.hadoop.hbase.HRegionInfo;
-
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
/**
* Test of a long-lived scanner validating as we go.
*/
public class TestScanner extends HBaseTestCase {
- private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
+ private final Log LOG = LogFactory.getLog(this.getClass());
+
+ private static final byte [] FIRST_ROW =
+ HConstants.EMPTY_START_ROW;
private static final byte [][] COLS = {
HConstants.COLUMN_FAMILY
};
@@ -52,7 +57,8 @@
private static final byte [] ROW_KEY =
HRegionInfo.ROOT_REGIONINFO.getRegionName();
- private static final HRegionInfo REGION_INFO = HRegionInfo.ROOT_REGIONINFO;
+ private static final HRegionInfo REGION_INFO =
+ HRegionInfo.ROOT_REGIONINFO;
private static final long START_CODE = Long.MAX_VALUE;
@@ -84,8 +90,7 @@
/** Use a scanner to get the region info and then validate the results */
private void scan(boolean validateStartcode, String serverName)
- throws IOException {
-
+ throws IOException {
InternalScanner scanner = null;
TreeMap<byte [], Cell> results =
new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
@@ -140,7 +145,55 @@
byte [] bytes = region.get(ROW_KEY, HConstants.COL_REGIONINFO).getValue();
validateRegionInfo(bytes);
}
+
+ /**
+ * HBase-910.
+ * @throws Exception
+ */
+ public void testScanAndConcurrentFlush() throws Exception {
+ this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
+ HRegionIncommon hri = new HRegionIncommon(r);
+ try {
+ addContent(hri, Bytes.toString(HConstants.COL_REGIONINFO));
+ int count = count(hri, -1);
+ assertEquals(count, count(hri, 100));
+ assertEquals(count, count(hri, 0));
+ assertEquals(count, count(hri, count - 1));
+ } finally {
+ this.r.close();
+ this.r.getLog().closeAndDelete();
+ shutdownDfs(cluster);
+ }
+ }
+ /*
+ * @param hri Region
+ * @param flushIndex At what row we start the flush.
+ * @return Count of rows found.
+ * @throws IOException
+ */
+ private int count(final HRegionIncommon hri, final int flushIndex)
+ throws IOException {
+ LOG.info("Taking out counting scan");
+ ScannerIncommon s = hri.getScanner(EXPLICIT_COLS,
+ HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
+ HStoreKey key = new HStoreKey();
+ SortedMap<byte [], Cell> values =
+ new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
+ int count = 0;
+ while (s.next(key, values)) {
+ count++;
+ if (flushIndex == count) {
+ LOG.info("Starting flush at flush index " + flushIndex);
+ hri.flushcache();
+ LOG.info("Finishing flush");
+ }
+ }
+ s.close();
+ LOG.info("Found " + count + " items");
+ return count;
+ }
+
/** The test!
* @throws IOException
*/