You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/07/28 21:41:55 UTC
svn commit: r798679 - in /hadoop/hbase/trunk: ./
src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/io/hfile/ src/...
Author: stack
Date: Tue Jul 28 19:41:54 2009
New Revision: 798679
URL: http://svn.apache.org/viewvc?rev=798679&view=rev
Log:
HBASE-1671 HBASE-1609 broke scanners riding across splits
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/DoNotRetryIOException.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Tue Jul 28 19:41:54 2009
@@ -295,6 +295,7 @@
(Tim Sell and Ryan Rawson via Stack)
HBASE-1703 ICVs across /during a flush can cause multiple keys with the
same TS (bad)
+ HBASE-1671 HBASE-1609 broke scanners riding across splits
IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java Tue Jul 28 19:41:54 2009
@@ -31,9 +31,8 @@
private TransactionState transactionState;
TransactionScannerCallable(final TransactionState transactionState,
- final HConnection connection, final byte[] tableName,
- final byte[] startRow, Scan scan) {
- super(connection, tableName, startRow, scan);
+ final HConnection connection, final byte[] tableName, Scan scan) {
+ super(connection, tableName, scan);
this.transactionState = transactionState;
}
Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java Tue Jul 28 19:41:54 2009
@@ -186,7 +186,7 @@
final byte[] localStartKey, int caching) {
TransactionScannerCallable t =
new TransactionScannerCallable(transactionState, getConnection(),
- getTableName(), getScan().getStartRow(), getScan());
+ getTableName(), getScan());
t.setCaching(caching);
return t;
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/DoNotRetryIOException.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/DoNotRetryIOException.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/DoNotRetryIOException.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/DoNotRetryIOException.java Tue Jul 28 19:41:54 2009
@@ -42,4 +42,12 @@
public DoNotRetryIOException(String message) {
super(message);
}
+
+ /**
+ * @param message
+ * @param cause
+ */
+ public DoNotRetryIOException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Jul 28 19:41:54 2009
@@ -65,8 +65,6 @@
* Used by {@link HTable} and {@link HBaseAdmin}
*/
public class HConnectionManager implements HConstants {
- private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
-
/*
* Not instantiable.
*/
@@ -94,7 +92,6 @@
if (connection == null) {
connection = new TableServers(conf);
HBASE_INSTANCES.put(conf, connection);
- LOG.debug("Created new HBASE_INSTANCES");
}
}
return connection;
@@ -131,7 +128,7 @@
/* Encapsulates finding the servers for an HBase instance */
private static class TableServers implements ServerConnection, HConstants, Watcher {
- private static final Log LOG = LogFactory.getLog(TableServers.class);
+ static final Log LOG = LogFactory.getLog(TableServers.class);
private final Class<? extends HRegionInterface> serverInterfaceClass;
private final long pause;
private final int numRetries;
@@ -353,8 +350,7 @@
MetaScannerVisitor visitor = new MetaScannerVisitor() {
public boolean processRow(Result result) throws IOException {
try {
- byte[] value =
- result.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER);
+ byte[] value = result.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER);
HRegionInfo info = null;
if (value != null) {
info = Writables.getHRegionInfo(value);
@@ -411,9 +407,7 @@
scan.addColumn(CATALOG_FAMILY, REGIONINFO_QUALIFIER);
ScannerCallable s = new ScannerCallable(this,
(Bytes.equals(tableName, HConstants.META_TABLE_NAME) ?
- HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME),
- scan.getStartRow(),
- scan);
+ HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME), scan);
try {
// Open scanner
getRegionServerWithRetries(s);
@@ -542,7 +536,7 @@
*/
private HRegionLocation locateRegionInMeta(final byte [] parentTable,
final byte [] tableName, final byte [] row, boolean useCache)
- throws IOException{
+ throws IOException {
HRegionLocation location = null;
// If supposed to be using the cache, then check it for a possible hit.
// Otherwise, delete any existing cached location so it won't interfere.
@@ -969,7 +963,7 @@
throw (DoNotRetryIOException) t;
}
}
- return null;
+ return null;
}
private HRegionLocation
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Tue Jul 28 19:41:54 2009
@@ -29,6 +29,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -36,6 +37,7 @@
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
@@ -54,7 +56,6 @@
/**
* Used to communicate with a single HBase table
* TODO: checkAndSave in oldAPI
- * TODO: Converting filters
* TODO: Regex deletes.
*/
public class HTable {
@@ -1778,13 +1779,18 @@
*/
protected class ClientScanner implements ResultScanner {
private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
+ // HEADSUP: The scan internal start row can change as we move through table.
private Scan scan;
private boolean closed = false;
+ // Current region scanner is against. Gets cleared if current region goes
+ // wonky: e.g. if it splits on us.
private HRegionInfo currentRegion = null;
private ScannerCallable callable = null;
private final LinkedList<Result> cache = new LinkedList<Result>();
- private final int scannerCaching = HTable.this.scannerCaching;
+ private final int caching = HTable.this.scannerCaching;
private long lastNext;
+ // Keep lastResult returned successfully in case we have to reset scanner.
+ private Result lastResult = null;
protected ClientScanner(final Scan scan) {
if (CLIENT_LOG.isDebugEnabled()) {
@@ -1804,7 +1810,7 @@
}
public void initialize() throws IOException {
- nextScanner(this.scannerCaching);
+ nextScanner(this.caching);
}
protected Scan getScan() {
@@ -1814,10 +1820,12 @@
protected long getTimestamp() {
return lastNext;
}
-
+
/*
- * Gets a scanner for the next region.
- * Returns false if there are no more scanners.
+ * Gets a scanner for the next region. If this.currentRegion != null, then
+ * we will move to the endrow of this.currentRegion. Else we will get
+ * scanner at the scan.getStartRow().
+ * @param nbRows
*/
private boolean nextScanner(int nbRows) throws IOException {
// Close the previous scanner if it's open
@@ -1826,38 +1834,38 @@
getConnection().getRegionServerWithRetries(callable);
this.callable = null;
}
-
+
+ // Where to start the next scanner
+ byte [] localStartKey = null;
+
// if we're at the end of the table, then close and return false
// to stop iterating
- if (currentRegion != null) {
+ if (this.currentRegion != null) {
if (CLIENT_LOG.isDebugEnabled()) {
- CLIENT_LOG.debug("Advancing forward from region " + currentRegion);
+ CLIENT_LOG.debug("Finished with region " + this.currentRegion);
}
-
- byte [] endKey = currentRegion.getEndKey();
+ byte [] endKey = this.currentRegion.getEndKey();
if (endKey == null ||
Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
filterSaysStop(endKey)) {
close();
return false;
}
- }
-
- HRegionInfo oldRegion = this.currentRegion;
- byte [] localStartKey =
- oldRegion == null ? scan.getStartRow() : oldRegion.getEndKey();
+ localStartKey = endKey;
+ } else {
+ localStartKey = this.scan.getStartRow();
+ }
if (CLIENT_LOG.isDebugEnabled()) {
CLIENT_LOG.debug("Advancing internal scanner to startKey at '" +
Bytes.toStringBinary(localStartKey) + "'");
- }
-
+ }
try {
callable = getScannerCallable(localStartKey, nbRows);
- // open a scanner on the region server starting at the
+ // Open a scanner on the region server starting at the
// beginning of the region
getConnection().getRegionServerWithRetries(callable);
- currentRegion = callable.getHRegionInfo();
+ this.currentRegion = callable.getHRegionInfo();
} catch (IOException e) {
close();
throw e;
@@ -1867,8 +1875,9 @@
protected ScannerCallable getScannerCallable(byte [] localStartKey,
int nbRows) {
+ scan.setStartRow(localStartKey);
ScannerCallable s = new ScannerCallable(getConnection(),
- getTableName(), localStartKey, scan);
+ getTableName(), scan);
s.setCaching(nbRows);
return s;
}
@@ -1915,13 +1924,35 @@
}
if (cache.size() == 0) {
Result [] values = null;
- int countdown = this.scannerCaching;
+ int countdown = this.caching;
// We need to reset it if it's a new callable that was created
// with a countdown in nextScanner
- callable.setCaching(this.scannerCaching);
+ callable.setCaching(this.caching);
+ // This flag is set when we want to skip the result returned. We do
+ // this when we reset scanner because it split under us.
+ boolean skipFirst = false;
do {
try {
values = getConnection().getRegionServerWithRetries(callable);
+ if (skipFirst) {
+ skipFirst = false;
+ // Reget.
+ values = getConnection().getRegionServerWithRetries(callable);
+ }
+ } catch (DoNotRetryIOException e) {
+ Throwable cause = e.getCause();
+ if (cause == null || !(cause instanceof NotServingRegionException)) {
+ throw e;
+ }
+ // Else, its signal from depths of ScannerCallable that we got an
+ // NSRE on a next and that we need to reset the scanner.
+ this.scan.setStartRow(this.lastResult.getRow());
+ // Clear region as flag to nextScanner to use this.scan.startRow.
+ this.currentRegion = null;
+ // Skip first row returned. We already let it out on previous
+ // invocation.
+ skipFirst = true;
+ continue;
} catch (IOException e) {
if (e instanceof UnknownScannerException &&
lastNext + scannerTimeout < System.currentTimeMillis()) {
@@ -1936,6 +1967,7 @@
for (Result rs : values) {
cache.add(rs);
countdown--;
+ this.lastResult = rs;
}
}
} while (countdown > 0 && nextScanner(countdown));
@@ -1965,7 +1997,7 @@
}
return resultSets.toArray(new Result[resultSets.size()]);
}
-
+
public void close() {
if (callable != null) {
callable.setClose();
@@ -1998,7 +2030,7 @@
return next != null;
} catch (IOException e) {
throw new RuntimeException(e);
- }
+ }
}
return true;
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java Tue Jul 28 19:41:54 2009
@@ -49,7 +49,7 @@
ScannerCallable callable = null;
do {
Scan scan = new Scan(startRow).addFamily(CATALOG_FAMILY);
- callable = new ScannerCallable(connection, META_TABLE_NAME, scan.getStartRow(), scan);
+ callable = new ScannerCallable(connection, META_TABLE_NAME, scan);
// Open scanner
connection.getRegionServerWithRetries(callable);
try {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java Tue Jul 28 19:41:54 2009
@@ -1,3 +1,4 @@
+
/**
* Copyright 2008 The Apache Software Foundation
*
@@ -22,7 +23,12 @@
import java.io.IOException;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.ipc.RemoteException;
+import org.mortbay.log.Log;
/**
@@ -34,20 +40,16 @@
private boolean instantiated = false;
private boolean closed = false;
private Scan scan;
- private byte [] startRow;
private int caching = 1;
/**
* @param connection
* @param tableName
- * @param startRow
* @param scan
*/
- public ScannerCallable (HConnection connection, byte [] tableName,
- byte [] startRow, Scan scan) {
- super(connection, tableName, startRow);
+ public ScannerCallable (HConnection connection, byte [] tableName, Scan scan) {
+ super(connection, tableName, scan.getStartRow());
this.scan = scan;
- this.startRow = startRow;
}
/**
@@ -67,18 +69,42 @@
*/
public Result [] call() throws IOException {
if (scannerId != -1L && closed) {
- server.close(scannerId);
- scannerId = -1L;
+ close();
} else if (scannerId == -1L && !closed) {
- // open the scanner
- scannerId = openScanner();
+ this.scannerId = openScanner();
} else {
- Result [] rrs = server.next(scannerId, caching);
+ Result [] rrs = null;
+ try {
+ rrs = server.next(scannerId, caching);
+ } catch (IOException e) {
+ IOException ioe = null;
+ if (e instanceof RemoteException) {
+ ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
+ }
+ if (ioe != null && ioe instanceof NotServingRegionException) {
+ // Throw a DNRE so that we break out of cycle of calling NSRE
+ // when what we need is to open scanner against new location.
+ // Attach NSRE to signal client that it needs to resetup scanner.
+ throw new DoNotRetryIOException("Reset scanner", ioe);
+ }
+ }
return rrs == null || rrs.length == 0? null: rrs;
}
return null;
}
+ private void close() {
+ if (this.scannerId == -1L) {
+ return;
+ }
+ try {
+ this.server.close(this.scannerId);
+ } catch (IOException e) {
+ Log.warn("Ignore, probably already closed", e);
+ }
+ this.scannerId = -1L;
+ }
+
protected long openScanner() throws IOException {
return server.openScanner(
this.location.getRegionInfo().getRegionName(), scan);
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java Tue Jul 28 19:41:54 2009
@@ -729,7 +729,7 @@
}
protected String toStringLastKey() {
- return KeyValue.keyToString(getFirstKey());
+ return KeyValue.keyToString(getLastKey());
}
public long length() {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Tue Jul 28 19:41:54 2009
@@ -188,7 +188,7 @@
* @param listener
* @throws IOException
*/
- public HLog(final FileSystem fs, final Path dir, final Configuration conf,
+ public HLog(final FileSystem fs, final Path dir, final HBaseConfiguration conf,
final LogRollListener listener)
throws IOException {
super();
@@ -219,7 +219,7 @@
", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
rollWriter();
// Test if syncfs is available.
- this.append = conf.getBoolean("dfs.support.append", false);
+ this.append = isAppend(conf);
Method m = null;
if (this.append) {
try {
@@ -784,7 +784,7 @@
* @throws IOException
*/
public static List<Path> splitLog(final Path rootDir, final Path srcDir,
- final FileSystem fs, final Configuration conf)
+ final FileSystem fs, final HBaseConfiguration conf)
throws IOException {
long millis = System.currentTimeMillis();
List<Path> splits = null;
@@ -833,7 +833,8 @@
* @return List of splits made.
*/
private static List<Path> splitLog(final Path rootDir,
- final FileStatus [] logfiles, final FileSystem fs, final Configuration conf)
+ final FileStatus [] logfiles, final FileSystem fs,
+ final HBaseConfiguration conf)
throws IOException {
final Map<byte [], WriterAndPath> logWriters =
new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR);
@@ -848,11 +849,12 @@
// More means faster but bigger mem consumption */
int concurrentLogReads =
conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
-
+ // Is append supported?
+ boolean append = isAppend(conf);
try {
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
concurrentLogReads)).intValue();
- for(int step = 0; step < maxSteps; step++) {
+ for (int step = 0; step < maxSteps; step++) {
final Map<byte[], LinkedList<HLogEntry>> logEntries =
new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
// Stop at logfiles.length when it's the last step
@@ -867,7 +869,6 @@
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
}
- boolean append = conf.getBoolean("dfs.support.append", false);
recoverLog(fs, logfiles[i].getPath(), append);
SequenceFile.Reader in = null;
int count = 0;
@@ -1022,6 +1023,24 @@
}
/**
+ * @param conf
+ * @return True if append enabled and we have the syncFs in our path.
+ */
+ private static boolean isAppend(final HBaseConfiguration conf) {
+ boolean append = conf.getBoolean("dfs.support.append", false);
+ if (append) {
+ try {
+ SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
+ append = true;
+ } catch (SecurityException e) {
+ } catch (NoSuchMethodException e) {
+ append = false;
+ }
+ }
+ return append;
+ }
+
+ /**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
*/
@@ -1158,10 +1177,9 @@
System.exit(-1);
}
}
- Configuration conf = new HBaseConfiguration();
+ HBaseConfiguration conf = new HBaseConfiguration();
FileSystem fs = FileSystem.get(conf);
Path baseDir = new Path(conf.get(HBASE_DIR));
-
for (int i = 1; i < args.length; i++) {
Path logPath = new Path(args[i]);
if (!fs.exists(logPath)) {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Jul 28 19:41:54 2009
@@ -110,7 +110,7 @@
* master as a region to close if the carrying regionserver is overloaded.
* Once set, it is never cleared.
*/
- private final AtomicBoolean closing = new AtomicBoolean(false);
+ final AtomicBoolean closing = new AtomicBoolean(false);
private final RegionHistorian historian;
//////////////////////////////////////////////////////////////////////////////
@@ -1671,8 +1671,6 @@
return this.basedir;
}
-
- //TODO
/**
* RegionScanner is an iterator through a bunch of rows in an HRegion.
* <p>
@@ -1710,9 +1708,15 @@
* Get the next row of results from this region.
* @param results list to append results to
* @return true if there are more rows, false if scanner is done
+ * @throws NotServerRegionException If this region is closing or closed
*/
public boolean next(List<KeyValue> results)
throws IOException {
+ if (closing.get() || closed.get()) {
+ close();
+ throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+ " is closing=" + closing.get() + " or closed=" + closed.get());
+ }
// This method should probably be reorganized a bit... has gotten messy
KeyValue kv = this.storeHeap.peek();
if (kv == null) {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jul 28 19:41:54 2009
@@ -803,7 +803,7 @@
*/
private Throwable cleanup(final Throwable t, final String msg) {
if (msg == null) {
- LOG.error(RemoteExceptionHandler.checkThrowable(t));
+ LOG.error("", RemoteExceptionHandler.checkThrowable(t));
} else {
LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
}
@@ -1890,7 +1890,7 @@
public Result [] next(final long scannerId, int nbRows) throws IOException {
try {
String scannerName = String.valueOf(scannerId);
- InternalScanner s = scanners.get(scannerName);
+ InternalScanner s = this.scanners.get(scannerName);
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName);
}
@@ -1918,6 +1918,9 @@
}
return results.toArray(new Result[0]);
} catch (Throwable t) {
+ if (t instanceof NotServingRegionException) {
+ this.scanners.remove(scannerId);
+ }
throw convertThrowableToIOE(cleanup(t));
}
}
@@ -1978,9 +1981,9 @@
boolean writeToWAL = true;
this.cacheFlusher.reclaimMemStoreMemory();
this.requestCount.incrementAndGet();
- Integer lock = getLockFromId(delete.getLockId());
+ Integer lid = getLockFromId(delete.getLockId());
HRegion region = getRegion(regionName);
- region.delete(delete, lock, writeToWAL);
+ region.delete(delete, lid, writeToWAL);
} catch(WrongRegionException ex) {
} catch (NotServingRegionException ex) {
} catch (Throwable t) {
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java Tue Jul 28 19:41:54 2009
@@ -21,7 +21,10 @@
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -34,6 +37,7 @@
* Tests forced splitting of HTable
*/
public class TestForceSplit extends HBaseClusterTestCase {
+ static final Log LOG = LogFactory.getLog(TestForceSplit.class);
private static final byte[] tableName = Bytes.toBytes("test");
private static final byte[] columnName = Bytes.toBytes("a:");
@@ -44,7 +48,7 @@
}
/**
- * the test
+ * Tests forcing split from client and having scanners successfully ride over split.
* @throws Exception
* @throws IOException
*/
@@ -55,7 +59,7 @@
htd.addFamily(new HColumnDescriptor(columnName));
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(htd);
- HTable table = new HTable(conf, tableName);
+ final HTable table = new HTable(conf, tableName);
byte[] k = new byte[3];
int rowCount = 0;
for (byte b1 = 'a'; b1 < 'z'; b1++) {
@@ -88,31 +92,50 @@
scanner.close();
assertEquals(rowCount, rows);
+ // Have an outstanding scan going on to make sure we can scan over splits.
+ scan = new Scan();
+ scanner = table.getScanner(scan);
+ // Scan first row so we are into first region before split happens.
+ scanner.next();
+
+ final AtomicInteger count = new AtomicInteger(0);
+ Thread t = new Thread("CheckForSplit") {
+ public void run() {
+ for (int i = 0; i < 20; i++) {
+ try {
+ sleep(1000);
+ } catch (InterruptedException e) {
+ continue;
+ }
+ // check again table = new HTable(conf, tableName);
+ Map<HRegionInfo, HServerAddress> regions = null;
+ try {
+ regions = table.getRegionsInfo();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (regions == null) continue;
+ count.set(regions.size());
+ if (count.get() >= 2) break;
+ LOG.debug("Cycle waiting on split");
+ }
+ }
+ };
+ t.start();
// tell the master to split the table
admin.split(Bytes.toString(tableName));
+ t.join();
- // give some time for the split to happen
- Thread.sleep(15 * 1000);
-
- // check again table = new HTable(conf, tableName);
- m = table.getRegionsInfo();
- System.out.println("Regions after split (" + m.size() + "): " + m);
- // should have two regions now
- assertTrue(m.size() == 2);
-
// Verify row count
- scan = new Scan();
- scanner = table.getScanner(scan);
- rows = 0;
- for(Result result : scanner) {
+ rows = 1; // We counted one row above.
+ for (Result result : scanner) {
rows++;
- if(rows > rowCount) {
+ if (rows > rowCount) {
scanner.close();
- assertTrue("Have already scanned more rows than expected (" +
- rowCount + ")", false);
+ assertTrue("Scanned more than expected (" + rowCount + ")", false);
}
}
scanner.close();
assertEquals(rowCount, rows);
}
-}
+}
\ No newline at end of file