You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/09/08 05:09:43 UTC
svn commit: r573777 - in /lucene/hadoop/trunk/src/contrib/hbase: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/util/
src/test/org/apache/hadoop/hbase/
Author: jimk
Date: Fri Sep 7 20:09:42 2007
New Revision: 573777
URL: http://svn.apache.org/viewvc?rev=573777&view=rev
Log:
HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully
Added:
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Fri Sep 7 20:09:42 2007
@@ -27,6 +27,7 @@
HADOOP-1785 TableInputFormat.TableRecordReader.next has a bug
(Ning Li via Stack)
HADOOP-1800 output should default utf8 encoding
+ HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully
HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson
HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
HADOOP-1832 listTables() returns duplicate tables
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Fri Sep 7 20:09:42 2007
@@ -56,6 +56,7 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
@@ -102,7 +103,8 @@
long metaRescanInterval;
- final AtomicReference<HServerAddress> rootRegionLocation;
+ final AtomicReference<HServerAddress> rootRegionLocation =
+ new AtomicReference<HServerAddress>();
Lock splitLogLock = new ReentrantLock();
@@ -359,8 +361,10 @@
for (Text family: split.getTableDesc().families().keySet()) {
Path p = HStoreFile.getMapDir(fs.makeQualified(dir),
split.getRegionName(), HStoreKey.extractFamily(family));
+
// Look for reference files. Call listPaths with an anonymous
// instance of PathFilter.
+
Path [] ps = fs.listPaths(p,
new PathFilter () {
public boolean accept(Path path) {
@@ -368,7 +372,7 @@
}
}
);
-
+
if (ps != null && ps.length > 0) {
result = true;
break;
@@ -393,7 +397,7 @@
}
protected void checkAssigned(final HRegionInfo info,
- final String serverName, final long startCode) {
+ final String serverName, final long startCode) throws IOException {
// Skip region - if ...
if(info.offLine // offline
@@ -445,6 +449,7 @@
} catch (IOException e) {
LOG.warn("unable to split region server log because: ", e);
+ throw e;
}
}
@@ -512,6 +517,14 @@
// at least log it rather than go out silently.
LOG.error("Unexpected exception", e);
}
+
+ // We ran out of tries. Make sure the file system is still available
+
+ if (!FSUtils.isFileSystemAvailable(fs)) {
+ LOG.fatal("Shutting down hbase cluster: file system not available");
+ closed = true;
+ }
+
if (!closed) {
// sleep before retry
@@ -675,6 +688,13 @@
LOG.error("Unexpected exception", e);
}
+ // We ran out of tries. Make sure the file system is still available
+
+ if (!FSUtils.isFileSystemAvailable(fs)) {
+ LOG.fatal("Shutting down hbase cluster: file system not available");
+ closed = true;
+ }
+
if (!closed) {
// sleep before retry
try {
@@ -829,46 +849,56 @@
* @throws IOException
*/
public HMaster(Path dir, HServerAddress address, Configuration conf)
- throws IOException {
+ throws IOException {
+
this.closed = true;
this.dir = dir;
this.conf = conf;
this.fs = FileSystem.get(conf);
this.rand = new Random();
-
- // Make sure the root directory exists!
- if(! fs.exists(dir)) {
- fs.mkdirs(dir);
- }
-
+
Path rootRegionDir =
HRegion.getRegionDir(dir, HGlobals.rootRegionInfo.regionName);
LOG.info("Root region dir: " + rootRegionDir.toString());
- if (!fs.exists(rootRegionDir)) {
- LOG.info("bootstrap: creating ROOT and first META regions");
- try {
- HRegion root = HRegion.createHRegion(HGlobals.rootRegionInfo, this.dir,
- this.conf, null);
-
- HRegion meta =
- HRegion.createHRegion(new HRegionInfo(1L, HGlobals.metaTableDesc,
- null, null), this.dir, this.conf, null);
-
- // Add first region from the META table to the ROOT region.
-
- HRegion.addRegionToMETA(root, meta);
- root.close();
- root.getLog().closeAndDelete();
- meta.close();
- meta.getLog().closeAndDelete();
-
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ try {
+
+ // Make sure the root directory exists!
+
+ if(! fs.exists(dir)) {
+ fs.mkdirs(dir);
+ }
+
+ if (!fs.exists(rootRegionDir)) {
+ LOG.info("bootstrap: creating ROOT and first META regions");
+ try {
+ HRegion root = HRegion.createHRegion(HGlobals.rootRegionInfo, this.dir,
+ this.conf, null);
+
+ HRegion meta =
+ HRegion.createHRegion(new HRegionInfo(1L, HGlobals.metaTableDesc,
+ null, null), this.dir, this.conf, null);
+
+ // Add first region from the META table to the ROOT region.
+
+ HRegion.addRegionToMETA(root, meta);
+ root.close();
+ root.getLog().closeAndDelete();
+ meta.close();
+ meta.getLog().closeAndDelete();
+
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ LOG.error("bootstrap", e);
+ throw e;
}
- LOG.error("bootstrap", e);
}
+
+ } catch (IOException e) {
+ LOG.fatal("Not starting HMaster because:", e);
+ return;
}
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
@@ -898,7 +928,6 @@
// The root region
- this.rootRegionLocation = new AtomicReference<HServerAddress>();
this.rootScanned = false;
this.rootScanner = new RootScanner();
this.rootScannerThread = new Thread(rootScanner, "HMaster.rootScanner");
@@ -1038,9 +1067,15 @@
(RemoteException) ex);
} catch (IOException e) {
+ ex = e;
LOG.warn("main processing loop: " + op.toString(), e);
}
}
+ if (!FSUtils.isFileSystemAvailable(fs)) {
+ LOG.fatal("Shutting down hbase cluster: file system not available");
+ closed = true;
+ break;
+ }
LOG.warn("Processing pending operations: " + op.toString(), ex);
try {
msgQueue.put(op);
@@ -2627,6 +2662,13 @@
} catch (IOException e) {
if (tries == numRetries - 1) {
+ // No retries left
+
+ if (!FSUtils.isFileSystemAvailable(fs)) {
+ LOG.fatal("Shutting down hbase cluster: file system not available");
+ closed = true;
+ }
+
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
@@ -2692,7 +2734,7 @@
@Override
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
- throws IOException {
+ throws IOException {
// Process regions not being served
@@ -2719,32 +2761,9 @@
updateRegionInfo(b, i);
b.delete(lockid, COL_SERVER);
b.delete(lockid, COL_STARTCODE);
-
- for (int tries = 0; tries < numRetries; tries++) {
- try {
- server.batchUpdate(m.getRegionName(), System.currentTimeMillis(), b);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("updated columns in row: " + i.regionName);
- }
- break;
-
- } catch (IOException e) {
- if (tries == numRetries - 1) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- LOG.error("column update failed in row: " + i.regionName, e);
- break;
- }
- }
- try {
- Thread.sleep(threadWakeFrequency);
-
- } catch (InterruptedException e) {
- // continue
- }
+ server.batchUpdate(m.getRegionName(), System.currentTimeMillis(), b);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updated columns in row: " + i.regionName);
}
if (online) { // Bring offline regions on-line
@@ -2795,7 +2814,7 @@
}
protected void updateRegionInfo(final BatchUpdate b, final HRegionInfo i)
- throws IOException {
+ throws IOException {
i.offLine = !online;
b.put(lockid, COL_REGIONINFO, Writables.getBytes(i));
@@ -2815,7 +2834,7 @@
@Override
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
- throws IOException {
+ throws IOException {
// For regions that are being served, mark them for deletion
@@ -2853,6 +2872,7 @@
}
private abstract class ColumnOperation extends TableOperation {
+
protected ColumnOperation(Text tableName) throws IOException {
super(tableName);
}
@@ -2874,31 +2894,9 @@
BatchUpdate b = new BatchUpdate(rand.nextLong());
long lockid = b.startUpdate(i.regionName);
b.put(lockid, COL_REGIONINFO, Writables.getBytes(i));
-
- for (int tries = 0; tries < numRetries; tries++) {
- try {
- server.batchUpdate(regionName, System.currentTimeMillis(), b);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("updated columns in row: " + i.regionName);
- }
- break;
-
- } catch (IOException e) {
- if (tries == numRetries - 1) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
- }
- LOG.error("column update failed in row: " + i.regionName, e);
- break;
- }
- }
- try {
- Thread.sleep(threadWakeFrequency);
-
- } catch (InterruptedException e) {
- // continue
- }
+ server.batchUpdate(regionName, System.currentTimeMillis(), b);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updated columns in row: " + i.regionName);
}
}
}
@@ -2914,7 +2912,7 @@
@Override
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
- throws IOException {
+ throws IOException {
for (HRegionInfo i: unservedRegions) {
i.tableDesc.families().remove(columnName);
@@ -2922,27 +2920,8 @@
// Delete the directories used by the column
- try {
- fs.delete(HStoreFile.getMapDir(dir, i.regionName, columnName));
-
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- LOG.error("", e);
- }
-
- try {
- fs.delete(HStoreFile.getInfoDir(dir, i.regionName, columnName));
-
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- LOG.error("", e);
- }
+ fs.delete(HStoreFile.getMapDir(dir, i.regionName, columnName));
+ fs.delete(HStoreFile.getInfoDir(dir, i.regionName, columnName));
}
}
}
@@ -2958,7 +2937,7 @@
@Override
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
- throws IOException {
+ throws IOException {
for (HRegionInfo i: unservedRegions) {
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Fri Sep 7 20:09:42 2007
@@ -334,7 +334,9 @@
} catch (RuntimeException ex) {
LOG.error("error initializing HMemcache scanner: ", ex);
close();
- throw ex;
+ IOException e = new IOException("error initializing HMemcache scanner");
+ e.initCause(ex);
+ throw e;
} catch(IOException ex) {
LOG.error("error initializing HMemcache scanner: ", ex);
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Fri Sep 7 20:09:42 2007
@@ -52,20 +52,20 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
-/*******************************************************************************
+/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
- ******************************************************************************/
+ */
public class HRegionServer implements HConstants, HRegionInterface, Runnable {
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
public long getProtocolVersion(final String protocol,
@SuppressWarnings("unused") final long clientVersion)
- throws IOException {
+ throws IOException {
+
if (protocol.equals(HRegionInterface.class.getName())) {
return HRegionInterface.versionID;
}
@@ -79,7 +79,8 @@
// of HRegionServer in isolation.
protected volatile boolean stopRequested;
- // Go down hard. Used debugging and in unit tests.
+ // Go down hard. Used if file system becomes unavailable and also in
+ // debugging and unit tests.
protected volatile boolean abortRequested;
final Path rootDir;
@@ -146,23 +147,23 @@
* {@inheritDoc}
*/
public void run() {
- while(!stopRequested) {
+ while (!stopRequested) {
long startTime = System.currentTimeMillis();
- synchronized(splitOrCompactLock) { // Don't interrupt us while we're working
+ synchronized (splitOrCompactLock) { // Don't interrupt us while we're working
// Grab a list of regions to check
- Vector<HRegion> regionsToCheck = new Vector<HRegion>();
+ ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>();
lock.readLock().lock();
try {
regionsToCheck.addAll(onlineRegions.values());
} finally {
lock.readLock().unlock();
}
- try {
- for(HRegion cur: regionsToCheck) {
- if(cur.isClosed()) {
- // Skip if closed
- continue;
- }
+ for(HRegion cur: regionsToCheck) {
+ if(cur.isClosed()) {
+ // Skip if closed
+ continue;
+ }
+ try {
if (cur.needsCompaction()) {
cur.compactStores();
}
@@ -172,10 +173,13 @@
if (cur.needsSplit(midKey)) {
split(cur, midKey);
}
+ } catch(IOException e) {
+ //TODO: What happens if this fails? Are we toast?
+ LOG.error("Split or compaction failed", e);
+ if (!checkFileSystem()) {
+ break;
+ }
}
- } catch(IOException e) {
- //TODO: What happens if this fails? Are we toast?
- LOG.error("What happens if this fails? Are we toast?", e);
}
}
@@ -198,7 +202,8 @@
}
private void split(final HRegion region, final Text midKey)
- throws IOException {
+ throws IOException {
+
final HRegionInfo oldRegionInfo = region.getRegionInfo();
final HRegion[] newRegions = region.closeAndSplit(midKey, this);
@@ -286,7 +291,7 @@
synchronized(cacheFlusherLock) {
// Grab a list of items to flush
- Vector<HRegion> toFlush = new Vector<HRegion>();
+ ArrayList<HRegion> toFlush = new ArrayList<HRegion>();
lock.readLock().lock();
try {
toFlush.addAll(onlineRegions.values());
@@ -310,7 +315,10 @@
iex = x;
}
}
- LOG.error("", iex);
+ LOG.error("Cache flush failed", iex);
+ if (!checkFileSystem()) {
+ break;
+ }
}
}
}
@@ -332,7 +340,7 @@
// File paths
- private FileSystem fs;
+ FileSystem fs;
// Logging
@@ -368,7 +376,10 @@
iex = x;
}
}
- LOG.warn("", iex);
+ LOG.error("Log rolling failed", iex);
+ if (!checkFileSystem()) {
+ break;
+ }
}
}
}
@@ -737,7 +748,7 @@
e = ex;
}
}
- LOG.warn("Abort close of log", e);
+ LOG.error("Unable to close log in abort", e);
}
closeAllRegions(); // Don't leave any open file handles
LOG.info("aborting server at: " +
@@ -902,6 +913,9 @@
}
} else {
LOG.error("unable to process message: " + e.msg.toString(), ie);
+ if (!checkFileSystem()) {
+ break;
+ }
}
}
}
@@ -973,117 +987,246 @@
return regionsToClose;
}
- //////////////////////////////////////////////////////////////////////////////
+ //
// HRegionInterface
- //////////////////////////////////////////////////////////////////////////////
+ //
/** {@inheritDoc} */
public HRegionInfo getRegionInfo(final Text regionName)
- throws NotServingRegionException {
+ throws NotServingRegionException {
+
requestCount.incrementAndGet();
return getRegion(regionName).getRegionInfo();
}
/** {@inheritDoc} */
public byte [] get(final Text regionName, final Text row,
- final Text column)
- throws IOException {
+ final Text column) throws IOException {
+
requestCount.incrementAndGet();
- return getRegion(regionName).get(row, column);
+ try {
+ return getRegion(regionName).get(row, column);
+
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
}
/** {@inheritDoc} */
public byte [][] get(final Text regionName, final Text row,
- final Text column, final int numVersions)
- throws IOException {
+ final Text column, final int numVersions) throws IOException {
+
requestCount.incrementAndGet();
- return getRegion(regionName).get(row, column, numVersions);
+ try {
+ return getRegion(regionName).get(row, column, numVersions);
+
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
}
/** {@inheritDoc} */
public byte [][] get(final Text regionName, final Text row, final Text column,
final long timestamp, final int numVersions) throws IOException {
+
requestCount.incrementAndGet();
- return getRegion(regionName).get(row, column, timestamp, numVersions);
+ try {
+ return getRegion(regionName).get(row, column, timestamp, numVersions);
+
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
}
/** {@inheritDoc} */
public MapWritable getRow(final Text regionName, final Text row)
- throws IOException {
+ throws IOException {
+
requestCount.incrementAndGet();
- HRegion region = getRegion(regionName);
- MapWritable result = new MapWritable();
- TreeMap<Text, byte[]> map = region.getFull(row);
- for (Map.Entry<Text, byte []> es: map.entrySet()) {
- result.put(new HStoreKey(row, es.getKey()),
- new ImmutableBytesWritable(es.getValue()));
+ try {
+ HRegion region = getRegion(regionName);
+ MapWritable result = new MapWritable();
+ TreeMap<Text, byte[]> map = region.getFull(row);
+ for (Map.Entry<Text, byte []> es: map.entrySet()) {
+ result.put(new HStoreKey(row, es.getKey()),
+ new ImmutableBytesWritable(es.getValue()));
+ }
+ return result;
+
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
}
- return result;
}
/** {@inheritDoc} */
- public MapWritable next(final long scannerId)
- throws IOException {
+ public MapWritable next(final long scannerId) throws IOException {
+
requestCount.incrementAndGet();
- String scannerName = String.valueOf(scannerId);
- HInternalScannerInterface s = scanners.get(scannerName);
- if (s == null) {
- throw new UnknownScannerException("Name: " + scannerName);
- }
- leases.renewLease(scannerId, scannerId);
-
- // Collect values to be returned here
-
- MapWritable values = new MapWritable();
-
- // Keep getting rows until we find one that has at least one non-deleted column value
-
- HStoreKey key = new HStoreKey();
- TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
- while (s.next(key, results)) {
- for(Map.Entry<Text, byte []> e: results.entrySet()) {
- HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
- byte [] val = e.getValue();
- if (HGlobals.deleteBytes.compareTo(val) == 0) {
- // Column value is deleted. Don't return it.
- continue;
+ try {
+ String scannerName = String.valueOf(scannerId);
+ HInternalScannerInterface s = scanners.get(scannerName);
+ if (s == null) {
+ throw new UnknownScannerException("Name: " + scannerName);
+ }
+ leases.renewLease(scannerId, scannerId);
+
+ // Collect values to be returned here
+
+ MapWritable values = new MapWritable();
+
+ // Keep getting rows until we find one that has at least one non-deleted column value
+
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
+ while (s.next(key, results)) {
+ for(Map.Entry<Text, byte []> e: results.entrySet()) {
+ HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
+ byte [] val = e.getValue();
+ if (HGlobals.deleteBytes.compareTo(val) == 0) {
+ // Column value is deleted. Don't return it.
+ continue;
+ }
+ values.put(k, new ImmutableBytesWritable(val));
+ }
+
+ if(values.size() > 0) {
+ // Row has something in it. Return the value.
+ break;
}
- values.put(k, new ImmutableBytesWritable(val));
+
+ // No data for this row, go get another.
+
+ results.clear();
}
+ return values;
- if(values.size() > 0) {
- // Row has something in it. Return the value.
- break;
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
+ throws IOException {
+
+ requestCount.incrementAndGet();
+ try {
+ long lockid = startUpdate(regionName, b.getRow());
+ for(BatchOperation op: b) {
+ switch(op.getOp()) {
+ case BatchOperation.PUT_OP:
+ put(regionName, lockid, op.getColumn(), op.getValue());
+ break;
+
+ case BatchOperation.DELETE_OP:
+ delete(regionName, lockid, op.getColumn());
+ break;
+ }
}
+ commit(regionName, lockid, timestamp);
- // No data for this row, go get another.
-
- results.clear();
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
}
- return values;
}
+
+ //
+ // remote scanner interface
+ //
/** {@inheritDoc} */
- public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
- throws IOException {
+ public long openScanner(Text regionName, Text[] cols, Text firstRow,
+ final long timestamp, final RowFilterInterface filter)
+ throws IOException {
+
requestCount.incrementAndGet();
- long lockid = startUpdate(regionName, b.getRow());
- for(BatchOperation op: b) {
- switch(op.getOp()) {
- case BatchOperation.PUT_OP:
- put(regionName, lockid, op.getColumn(), op.getValue());
- break;
+ try {
+ HRegion r = getRegion(regionName);
+ long scannerId = -1L;
+ HInternalScannerInterface s =
+ r.getScanner(cols, firstRow, timestamp, filter);
+ scannerId = rand.nextLong();
+ String scannerName = String.valueOf(scannerId);
+ synchronized(scanners) {
+ scanners.put(scannerName, s);
+ }
+ leases.createLease(scannerId, scannerId, new ScannerListener(scannerName));
+ return scannerId;
- case BatchOperation.DELETE_OP:
- delete(regionName, lockid, op.getColumn());
- break;
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ try {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ } catch (IOException x) {
+ e = x;
+ }
}
+ LOG.error("", e);
+ checkFileSystem();
+ throw e;
}
- commit(regionName, lockid, timestamp);
}
- protected long startUpdate(Text regionName, Text row)
- throws IOException {
+ /** {@inheritDoc} */
+ public void close(final long scannerId) throws IOException {
+ requestCount.incrementAndGet();
+ try {
+ String scannerName = String.valueOf(scannerId);
+ HInternalScannerInterface s = null;
+ synchronized(scanners) {
+ s = scanners.remove(scannerName);
+ }
+ if(s == null) {
+ throw new UnknownScannerException(scannerName);
+ }
+ s.close();
+ leases.cancelLease(scannerId, scannerId);
+
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ Map<String, HInternalScannerInterface> scanners =
+ Collections.synchronizedMap(new HashMap<String,
+ HInternalScannerInterface>());
+
+ /**
+ * Instantiated as a scanner lease.
+ * If the lease times out, the scanner is closed
+ */
+ private class ScannerListener implements LeaseListener {
+ private final String scannerName;
+
+ ScannerListener(final String n) {
+ this.scannerName = n;
+ }
+
+ /** {@inheritDoc} */
+ public void leaseExpired() {
+ LOG.info("Scanner " + this.scannerName + " lease expired");
+ HInternalScannerInterface s = null;
+ synchronized(scanners) {
+ s = scanners.remove(this.scannerName);
+ }
+ if (s != null) {
+ s.close();
+ }
+ }
+ }
+
+ //
+ // Methods that do the actual work for the remote API
+ //
+
+ protected long startUpdate(Text regionName, Text row) throws IOException {
HRegion region = getRegion(regionName);
return region.startUpdate(row);
@@ -1097,7 +1240,7 @@
}
protected void delete(Text regionName, long lockid, Text column)
- throws IOException {
+ throws IOException {
HRegion region = getRegion(regionName);
region.delete(lockid, column);
@@ -1117,7 +1260,8 @@
* @throws NotServingRegionException
*/
protected HRegion getRegion(final Text regionName)
- throws NotServingRegionException {
+ throws NotServingRegionException {
+
return getRegion(regionName, false);
}
@@ -1129,9 +1273,9 @@
* @return {@link HRegion} for <code>regionName</code>
* @throws NotServingRegionException
*/
- protected HRegion getRegion(final Text regionName,
- final boolean checkRetiringRegions)
- throws NotServingRegionException {
+ protected HRegion getRegion(final Text regionName,
+ final boolean checkRetiringRegions) throws NotServingRegionException {
+
HRegion region = null;
this.lock.readLock().lock();
try {
@@ -1154,91 +1298,28 @@
this.lock.readLock().unlock();
}
}
-
- //////////////////////////////////////////////////////////////////////////////
- // remote scanner interface
- //////////////////////////////////////////////////////////////////////////////
-
- Map<String, HInternalScannerInterface> scanners =
- Collections.synchronizedMap(new HashMap<String,
- HInternalScannerInterface>());
-
- /**
- * Instantiated as a scanner lease.
- * If the lease times out, the scanner is closed
- */
- private class ScannerListener implements LeaseListener {
- private final String scannerName;
-
- ScannerListener(final String n) {
- this.scannerName = n;
- }
-
- /**
- * {@inheritDoc}
- */
- public void leaseExpired() {
- LOG.info("Scanner " + this.scannerName + " lease expired");
- HInternalScannerInterface s = null;
- synchronized(scanners) {
- s = scanners.remove(this.scannerName);
- }
- if (s != null) {
- s.close();
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public long openScanner(Text regionName, Text[] cols, Text firstRow,
- final long timestamp, final RowFilterInterface filter)
- throws IOException {
- requestCount.incrementAndGet();
- HRegion r = getRegion(regionName);
- long scannerId = -1L;
- try {
- HInternalScannerInterface s =
- r.getScanner(cols, firstRow, timestamp, filter);
- scannerId = rand.nextLong();
- String scannerName = String.valueOf(scannerId);
- synchronized(scanners) {
- scanners.put(scannerName, s);
- }
- leases.createLease(scannerId, scannerId,
- new ScannerListener(scannerName));
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
- } catch (IOException x) {
- e = x;
- }
- }
- LOG.error("", e);
- throw e;
- }
- return scannerId;
- }
/**
- * {@inheritDoc}
+ * Checks to see if the file system is still accessible.
+ * If not, sets abortRequested and stopRequested
+ *
+ * @return false if file system is not available
*/
- public void close(final long scannerId) throws IOException {
- requestCount.incrementAndGet();
- String scannerName = String.valueOf(scannerId);
- HInternalScannerInterface s = null;
- synchronized(scanners) {
- s = scanners.remove(scannerName);
- }
- if(s == null) {
- throw new UnknownScannerException(scannerName);
+ protected boolean checkFileSystem() {
+ boolean fsOk = true;
+ if (!FSUtils.isFileSystemAvailable(fs)) {
+ LOG.fatal("Shutting down HRegionServer: file system not available");
+ abortRequested = true;
+ stopRequested = true;
+ fsOk = false;
}
- s.close();
- leases.cancelLease(scannerId, scannerId);
+ return fsOk;
}
+ //
+ // Main program and support routines
+ //
+
private static void printUsageAndExit() {
printUsageAndExit(null);
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Fri Sep 7 20:09:42 2007
@@ -1298,6 +1298,9 @@
} catch (Exception ex) {
LOG.error("Failed construction", ex);
close();
+ IOException e = new IOException("HStoreScanner failed construction");
+ e.initCause(ex);
+ throw e;
}
}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=573777&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java Fri Sep 7 20:09:42 2007
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.dfs.DistributedFileSystem;
+
+/**
+ * Utility methods for interacting with the underlying file system.
+ */
+public class FSUtils {
+ private static final Log LOG = LogFactory.getLog(FSUtils.class);
+
+ private FSUtils() {} // not instantiable
+
+ /**
+ * Checks to see if the specified file system is available
+ *
+ * @param fs
+ * @return true if the specified file system is available.
+ */
+ public static boolean isFileSystemAvailable(FileSystem fs) {
+ boolean available = false;
+ if (fs instanceof DistributedFileSystem) {
+ try {
+ if (((DistributedFileSystem) fs).getDataNodeStats().length > 0) {
+ available = true;
+
+ } else {
+ LOG.fatal("file system unavailable: no data nodes");
+ }
+
+ } catch (IOException e) {
+ LOG.fatal("file system unavailable because: ", e);
+ }
+
+ } else {
+ available = true;
+ }
+ return available;
+ }
+}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Fri Sep 7 20:09:42 2007
@@ -310,6 +310,32 @@
}
/**
+ * Wait for Mini HBase Cluster to shut down.
+ */
+ public void join() {
+ if (regionThreads != null) {
+ synchronized(regionThreads) {
+ for(Thread t: regionThreads) {
+ if (t.isAlive()) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ }
+ }
+ }
+ if (masterThread != null && masterThread.isAlive()) {
+ try {
+ masterThread.join();
+ } catch(InterruptedException e) {
+ // continue
+ }
+ }
+ }
+
+ /**
* Shut down HBase cluster started by calling
* {@link #startMaster(Configuration)} and then
* {@link #startRegionServers(Configuration, int)};
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java Fri Sep 7 20:09:42 2007
@@ -61,31 +61,32 @@
value = System.getenv("DEBUGGING");
if(value != null && value.equalsIgnoreCase("TRUE")) {
debugging = true;
+ }
- Logger rootLogger = Logger.getRootLogger();
- // rootLogger.setLevel(Level.WARN);
+ Logger rootLogger = Logger.getRootLogger();
+ rootLogger.setLevel(Level.WARN);
- Level logLevel = Level.INFO;
- value = System.getenv("LOGGING_LEVEL");
- if(value != null && value.length() != 0) {
- if(value.equalsIgnoreCase("ALL")) {
- logLevel = Level.ALL;
- } else if(value.equalsIgnoreCase("DEBUG")) {
- logLevel = Level.DEBUG;
- } else if(value.equalsIgnoreCase("ERROR")) {
- logLevel = Level.ERROR;
- } else if(value.equalsIgnoreCase("FATAL")) {
- logLevel = Level.FATAL;
- } else if(value.equalsIgnoreCase("INFO")) {
- logLevel = Level.INFO;
- } else if(value.equalsIgnoreCase("OFF")) {
- logLevel = Level.OFF;
- } else if(value.equalsIgnoreCase("TRACE")) {
- logLevel = Level.TRACE;
- } else if(value.equalsIgnoreCase("WARN")) {
- logLevel = Level.WARN;
- }
+ Level logLevel = Level.DEBUG;
+ value = System.getenv("LOGGING_LEVEL");
+ if(value != null && value.length() != 0) {
+ if(value.equalsIgnoreCase("ALL")) {
+ logLevel = Level.ALL;
+ } else if(value.equalsIgnoreCase("DEBUG")) {
+ logLevel = Level.DEBUG;
+ } else if(value.equalsIgnoreCase("ERROR")) {
+ logLevel = Level.ERROR;
+ } else if(value.equalsIgnoreCase("FATAL")) {
+ logLevel = Level.FATAL;
+ } else if(value.equalsIgnoreCase("INFO")) {
+ logLevel = Level.INFO;
+ } else if(value.equalsIgnoreCase("OFF")) {
+ logLevel = Level.OFF;
+ } else if(value.equalsIgnoreCase("TRACE")) {
+ logLevel = Level.TRACE;
+ } else if(value.equalsIgnoreCase("WARN")) {
+ logLevel = Level.WARN;
}
+
ConsoleAppender consoleAppender = null;
for(Enumeration<Appender> e = rootLogger.getAllAppenders();
e.hasMoreElements();) {
@@ -103,8 +104,8 @@
consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n");
}
}
- Logger.getLogger(
- HBaseTestCase.class.getPackage().getName()).setLevel(logLevel);
}
+ Logger.getLogger(
+ HBaseTestCase.class.getPackage().getName()).setLevel(logLevel);
}
}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java?rev=573777&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java Fri Sep 7 20:09:42 2007
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * Test ability of HBase to handle DFS failure
+ */
+public class TestDFSAbort extends HBaseClusterTestCase {
+
+ /** constructor */
+ public TestDFSAbort() {
+ super();
+ conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout
+ conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries
+ conf.setInt("hbase.client.retries.number", 5); // reduce HBase retries
+ Logger.getRootLogger().setLevel(Level.WARN);
+ Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ HTableDescriptor desc = new HTableDescriptor(getName());
+ desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY_STR));
+
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testDFSAbort() throws Exception {
+
+ // By now the Mini DFS is running, Mini HBase is running and we have
+ // created a table. Now let's yank the rug out from HBase
+
+ cluster.getDFSCluster().shutdown();
+
+ // Now wait for Mini HBase Cluster to shut down
+
+ cluster.join();
+ }
+}